consensus_core/
core.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    iter,
7    sync::Arc,
8    time::Duration,
9    vec,
10};
11
12use consensus_config::{AuthorityIndex, ProtocolKeyPair};
13#[cfg(test)]
14use consensus_config::{Stake, local_committee_and_keys};
15use consensus_types::block::{BlockRef, BlockTimestampMs, Round};
16use itertools::Itertools as _;
17#[cfg(test)]
18use mysten_metrics::monitored_mpsc::UnboundedReceiver;
19use mysten_metrics::monitored_scope;
20use parking_lot::RwLock;
21use sui_macros::fail_point;
22use tokio::{
23    sync::{broadcast, watch},
24    time::Instant,
25};
26use tracing::{debug, info, trace, warn};
27
28#[cfg(test)]
29use crate::{
30    CommitConsumerArgs, TransactionClient, block_verifier::NoopBlockVerifier,
31    storage::mem_store::MemStore,
32};
33use crate::{
34    ancestor::{AncestorState, AncestorStateManager},
35    block::{
36        Block, BlockAPI, BlockV1, BlockV2, ExtendedBlock, GENESIS_ROUND, SignedBlock, Slot,
37        VerifiedBlock,
38    },
39    block_manager::BlockManager,
40    commit::{
41        CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
42    },
43    commit_observer::CommitObserver,
44    context::Context,
45    dag_state::DagState,
46    error::{ConsensusError, ConsensusResult},
47    leader_schedule::LeaderSchedule,
48    round_tracker::RoundTracker,
49    stake_aggregator::{QuorumThreshold, StakeAggregator},
50    transaction::TransactionConsumer,
51    transaction_certifier::TransactionCertifier,
52    universal_committer::{
53        UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
54    },
55};
56
57// Maximum number of commit votes to include in a block.
58// TODO: Move to protocol config, and verify in BlockVerifier.
59const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
60
61pub(crate) struct Core {
62    context: Arc<Context>,
63    /// The consumer to use in order to pull transactions to be included for the next proposals
64    transaction_consumer: TransactionConsumer,
65    /// This contains the reject votes on transactions which proposed blocks should include.
66    transaction_certifier: TransactionCertifier,
67    /// The block manager which is responsible for keeping track of the DAG dependencies when processing new blocks
68    /// and accept them or suspend if we are missing their causal history
69    block_manager: BlockManager,
70    /// Estimated delay by round for propagating blocks to a quorum.
71    /// Because of the nature of TCP and block streaming, propagation delay is expected to be
72    /// 0 in most cases, even when the actual latency of broadcasting blocks is high.
73    /// When this value is higher than the `propagation_delay_stop_proposal_threshold`,
74    /// most likely this validator cannot broadcast  blocks to the network at all.
75    /// Core stops proposing new blocks in this case.
76    propagation_delay: Round,
77    /// Used to make commit decisions for leader blocks in the dag.
78    committer: UniversalCommitter,
79    /// The last new round for which core has sent out a signal.
80    last_signaled_round: Round,
81    /// The blocks of the last included ancestors per authority. This vector is basically used as a
82    /// watermark in order to include in the next block proposal only ancestors of higher rounds.
83    /// By default, is initialised with `None` values.
84    last_included_ancestors: Vec<Option<BlockRef>>,
85    /// The last decided leader returned from the universal committer. Important to note
86    /// that this does not signify that the leader has been persisted yet as it still has
87    /// to go through CommitObserver and persist the commit in store. On recovery/restart
88    /// the last_decided_leader will be set to the last_commit leader in dag state.
89    last_decided_leader: Slot,
90    /// The consensus leader schedule to be used to resolve the leader for a
91    /// given round.
92    leader_schedule: Arc<LeaderSchedule>,
93    /// The commit observer is responsible for observing the commits and collecting
94    /// + sending subdags over the consensus output channel.
95    commit_observer: CommitObserver,
96    /// Sender of outgoing signals from Core.
97    signals: CoreSignals,
98    /// The keypair to be used for block signing
99    block_signer: ProtocolKeyPair,
100    /// Keeping track of state of the DAG, including blocks, commits and last committed rounds.
101    dag_state: Arc<RwLock<DagState>>,
102    /// The last known round for which the node has proposed. Any proposal should be for a round > of this.
103    /// This is currently being used to avoid equivocations during a node recovering from amnesia. When value is None it means that
104    /// the last block sync mechanism is enabled, but it hasn't been initialised yet.
105    last_known_proposed_round: Option<Round>,
106    // The ancestor state manager will keep track of the quality of the authorities
107    // based on the distribution of their blocks to the network. It will use this
108    // information to decide whether to include that authority block in the next
109    // proposal or not.
110    ancestor_state_manager: AncestorStateManager,
111    // The round tracker will keep track of the highest received and accepted rounds
112    // from all authorities. It will use this information to then calculate the
113    // quorum rounds periodically which is used across other components to make
114    // decisions about block proposals.
115    round_tracker: Arc<RwLock<RoundTracker>>,
116}
117
118impl Core {
119    pub(crate) fn new(
120        context: Arc<Context>,
121        leader_schedule: Arc<LeaderSchedule>,
122        transaction_consumer: TransactionConsumer,
123        transaction_certifier: TransactionCertifier,
124        block_manager: BlockManager,
125        commit_observer: CommitObserver,
126        signals: CoreSignals,
127        block_signer: ProtocolKeyPair,
128        dag_state: Arc<RwLock<DagState>>,
129        sync_last_known_own_block: bool,
130        round_tracker: Arc<RwLock<RoundTracker>>,
131    ) -> Self {
132        let last_decided_leader = dag_state.read().last_commit_leader();
133        let number_of_leaders = context
134            .protocol_config
135            .mysticeti_num_leaders_per_round()
136            .unwrap_or(1);
137        let committer = UniversalCommitterBuilder::new(
138            context.clone(),
139            leader_schedule.clone(),
140            dag_state.clone(),
141        )
142        .with_number_of_leaders(number_of_leaders)
143        .with_pipeline(true)
144        .build();
145
146        let last_proposed_block = dag_state.read().get_last_proposed_block();
147
148        let last_signaled_round = last_proposed_block.round();
149
150        // Recover the last included ancestor rounds based on the last proposed block. That will allow
151        // to perform the next block proposal by using ancestor blocks of higher rounds and avoid
152        // re-including blocks that have been already included in the last (or earlier) block proposal.
153        // This is only strongly guaranteed for a quorum of ancestors. It is still possible to re-include
154        // a block from an authority which hadn't been added as part of the last proposal hence its
155        // latest included ancestor is not accurately captured here. This is considered a small deficiency,
156        // and it mostly matters just for this next proposal without any actual penalties in performance
157        // or block proposal.
158        let mut last_included_ancestors = vec![None; context.committee.size()];
159        for ancestor in last_proposed_block.ancestors() {
160            last_included_ancestors[ancestor.author] = Some(*ancestor);
161        }
162
163        let min_propose_round = if sync_last_known_own_block {
164            None
165        } else {
166            // if the sync is disabled then we practically don't want to impose any restriction.
167            Some(0)
168        };
169
170        let propagation_scores = leader_schedule
171            .leader_swap_table
172            .read()
173            .reputation_scores
174            .clone();
175        let mut ancestor_state_manager =
176            AncestorStateManager::new(context.clone(), dag_state.clone());
177        ancestor_state_manager.set_propagation_scores(propagation_scores);
178
179        Self {
180            context,
181            last_signaled_round,
182            last_included_ancestors,
183            last_decided_leader,
184            leader_schedule,
185            transaction_consumer,
186            transaction_certifier,
187            block_manager,
188            propagation_delay: 0,
189            committer,
190            commit_observer,
191            signals,
192            block_signer,
193            dag_state,
194            last_known_proposed_round: min_propose_round,
195            ancestor_state_manager,
196            round_tracker,
197        }
198        .recover()
199    }
200
201    fn recover(mut self) -> Self {
202        let _s = self
203            .context
204            .metrics
205            .node_metrics
206            .scope_processing_time
207            .with_label_values(&["Core::recover"])
208            .start_timer();
209
210        // Try to commit and propose, since they may not have run after the last storage write.
211        self.try_commit(vec![]).unwrap();
212
213        let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
214        {
215            last_proposed_block
216        } else {
217            let last_proposed_block = self.dag_state.read().get_last_proposed_block();
218
219            if self.should_propose() {
220                assert!(
221                    last_proposed_block.round() > GENESIS_ROUND,
222                    "At minimum a block of round higher than genesis should have been produced during recovery"
223                );
224            }
225
226            // if no new block proposed then just re-broadcast the last proposed one to ensure liveness.
227            self.signals
228                .new_block(ExtendedBlock {
229                    block: last_proposed_block.clone(),
230                    excluded_ancestors: vec![],
231                })
232                .unwrap();
233            last_proposed_block
234        };
235
236        // Try to set up leader timeout if needed.
237        // This needs to be called after try_commit() and try_propose(), which may
238        // have advanced the threshold clock round.
239        self.try_signal_new_round();
240
241        info!(
242            "Core recovery completed with last proposed block {:?}",
243            last_proposed_block
244        );
245
246        self
247    }
248
249    /// Processes the provided blocks and accepts them if possible when their causal history exists.
250    /// The method returns:
251    /// - The references of ancestors missing their block
252    #[tracing::instrument(skip_all)]
253    pub(crate) fn add_blocks(
254        &mut self,
255        blocks: Vec<VerifiedBlock>,
256    ) -> ConsensusResult<BTreeSet<BlockRef>> {
257        let _scope = monitored_scope("Core::add_blocks");
258        let _s = self
259            .context
260            .metrics
261            .node_metrics
262            .scope_processing_time
263            .with_label_values(&["Core::add_blocks"])
264            .start_timer();
265        self.context
266            .metrics
267            .node_metrics
268            .core_add_blocks_batch_size
269            .observe(blocks.len() as f64);
270
271        let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
272
273        if !accepted_blocks.is_empty() {
274            trace!(
275                "Accepted blocks: {}",
276                accepted_blocks
277                    .iter()
278                    .map(|b| b.reference().to_string())
279                    .join(",")
280            );
281
282            // Try to commit the new blocks if possible.
283            self.try_commit(vec![])?;
284
285            // Try to propose now since there are new blocks accepted.
286            self.try_propose(false)?;
287
288            // Now set up leader timeout if needed.
289            // This needs to be called after try_commit() and try_propose(), which may
290            // have advanced the threshold clock round.
291            self.try_signal_new_round();
292        };
293
294        if !missing_block_refs.is_empty() {
295            trace!(
296                "Missing block refs: {}",
297                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
298            );
299        }
300
301        Ok(missing_block_refs)
302    }
303
304    // Adds the certified commits that have been synced via the commit syncer. We are using the commit info in order to skip running the decision
305    // rule and immediately commit the corresponding leaders and sub dags. Pay attention that no block acceptance is happening here, but rather
306    // internally in the `try_commit` method which ensures that everytime only the blocks corresponding to the certified commits that are about to
307    // be committed are accepted.
308    #[tracing::instrument(skip_all)]
309    pub(crate) fn add_certified_commits(
310        &mut self,
311        certified_commits: CertifiedCommits,
312    ) -> ConsensusResult<BTreeSet<BlockRef>> {
313        let _scope = monitored_scope("Core::add_certified_commits");
314
315        let votes = certified_commits.votes().to_vec();
316        let commits = self
317            .filter_new_commits(certified_commits.commits().to_vec())
318            .expect("Certified commits validation failed");
319
320        // Try to accept the certified commit votes.
321        // Even if they may not be part of a future commit, these blocks are useful for certifying
322        // commits when helping peers sync commits.
323        let (_, missing_block_refs) = self.block_manager.try_accept_blocks(votes);
324
325        // Try to commit the new blocks. Take into account the trusted commit that has been provided.
326        self.try_commit(commits)?;
327
328        // Try to propose now since there are new blocks accepted.
329        self.try_propose(false)?;
330
331        // Now set up leader timeout if needed.
332        // This needs to be called after try_commit() and try_propose(), which may
333        // have advanced the threshold clock round.
334        self.try_signal_new_round();
335
336        Ok(missing_block_refs)
337    }
338
339    /// Checks if provided block refs have been accepted. If not, missing block refs are kept for synchronizations.
340    /// Returns the references of missing blocks among the input blocks.
341    pub(crate) fn check_block_refs(
342        &mut self,
343        block_refs: Vec<BlockRef>,
344    ) -> ConsensusResult<BTreeSet<BlockRef>> {
345        let _scope = monitored_scope("Core::check_block_refs");
346        let _s = self
347            .context
348            .metrics
349            .node_metrics
350            .scope_processing_time
351            .with_label_values(&["Core::check_block_refs"])
352            .start_timer();
353        self.context
354            .metrics
355            .node_metrics
356            .core_check_block_refs_batch_size
357            .observe(block_refs.len() as f64);
358
359        // Try to find them via the block manager
360        let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
361
362        if !missing_block_refs.is_empty() {
363            trace!(
364                "Missing block refs: {}",
365                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
366            );
367        }
368        Ok(missing_block_refs)
369    }
370
371    /// If needed, signals a new clock round and sets up leader timeout.
372    fn try_signal_new_round(&mut self) {
373        // Signal only when the threshold clock round is more advanced than the last signaled round.
374        //
375        // NOTE: a signal is still sent even when a block has been proposed at the new round.
376        // We can consider changing this in the future.
377        let new_clock_round = self.dag_state.read().threshold_clock_round();
378        if new_clock_round <= self.last_signaled_round {
379            return;
380        }
381        // Then send a signal to set up leader timeout.
382        self.signals.new_round(new_clock_round);
383        self.last_signaled_round = new_clock_round;
384
385        // Report the threshold clock round
386        self.context
387            .metrics
388            .node_metrics
389            .threshold_clock_round
390            .set(new_clock_round as i64);
391    }
392
393    /// Creating a new block for the dictated round. This is used when a leader timeout occurs, either
394    /// when the min timeout expires or max. When `force = true` , then any checks like previous round
395    /// leader existence will get skipped.
396    pub(crate) fn new_block(
397        &mut self,
398        round: Round,
399        force: bool,
400    ) -> ConsensusResult<Option<VerifiedBlock>> {
401        let _scope = monitored_scope("Core::new_block");
402        if self.last_proposed_round() < round {
403            self.context
404                .metrics
405                .node_metrics
406                .leader_timeout_total
407                .with_label_values(&[&format!("{force}")])
408                .inc();
409            let result = self.try_propose(force);
410            // The threshold clock round may have advanced, so a signal needs to be sent.
411            self.try_signal_new_round();
412            return result;
413        }
414        Ok(None)
415    }
416
417    /// Keeps only the certified commits that have a commit index > last commit index.
418    /// It also ensures that the first commit in the list is the next one in line, otherwise it panics.
419    fn filter_new_commits(
420        &mut self,
421        commits: Vec<CertifiedCommit>,
422    ) -> ConsensusResult<Vec<CertifiedCommit>> {
423        // Filter out the commits that have been already locally committed and keep only anything that is above the last committed index.
424        let last_commit_index = self.dag_state.read().last_commit_index();
425        let commits = commits
426            .iter()
427            .filter(|commit| {
428                if commit.index() > last_commit_index {
429                    true
430                } else {
431                    tracing::debug!(
432                        "Skip commit for index {} as it is already committed with last commit index {}",
433                        commit.index(),
434                        last_commit_index
435                    );
436                    false
437                }
438            })
439            .cloned()
440            .collect::<Vec<_>>();
441
442        // Make sure that the first commit we find is the next one in line and there is no gap.
443        if let Some(commit) = commits.first()
444            && commit.index() != last_commit_index + 1
445        {
446            return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
447                expected_commit_index: last_commit_index + 1,
448                commit_index: commit.index(),
449            });
450        }
451
452        Ok(commits)
453    }
454
455    // Attempts to create a new block, persist and propose it to all peers.
456    // When force is true, ignore if leader from the last round exists among ancestors and if
457    // the minimum round delay has passed.
458    fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
459        if !self.should_propose() {
460            return Ok(None);
461        }
462        if let Some(extended_block) = self.try_new_block(force) {
463            self.signals.new_block(extended_block.clone())?;
464
465            fail_point!("consensus-after-propose");
466
467            // The new block may help commit.
468            self.try_commit(vec![])?;
469            return Ok(Some(extended_block.block));
470        }
471        Ok(None)
472    }
473
474    /// Attempts to propose a new block for the next round. If a block has already proposed for latest
475    /// or earlier round, then no block is created and None is returned.
476    fn try_new_block(&mut self, force: bool) -> Option<ExtendedBlock> {
477        let _s = self
478            .context
479            .metrics
480            .node_metrics
481            .scope_processing_time
482            .with_label_values(&["Core::try_new_block"])
483            .start_timer();
484
485        // Ensure the new block has a higher round than the last proposed block.
486        let clock_round = {
487            let dag_state = self.dag_state.read();
488            let clock_round = dag_state.threshold_clock_round();
489            if clock_round <= dag_state.get_last_proposed_block().round() {
490                debug!(
491                    "Skipping block proposal for round {} as it is not higher than the last proposed block {}",
492                    clock_round,
493                    dag_state.get_last_proposed_block().round()
494                );
495                return None;
496            }
497            clock_round
498        };
499
500        // There must be a quorum of blocks from the previous round.
501        let quorum_round = clock_round.saturating_sub(1);
502
503        // Create a new block either because we want to "forcefully" propose a block due to a leader timeout,
504        // or because we are actually ready to produce the block (leader exists and min delay has passed).
505        if !force {
506            if !self.leaders_exist(quorum_round) {
507                return None;
508            }
509
510            if Duration::from_millis(
511                self.context
512                    .clock
513                    .timestamp_utc_ms()
514                    .saturating_sub(self.last_proposed_timestamp_ms()),
515            ) < self.context.parameters.min_round_delay
516            {
517                debug!(
518                    "Skipping block proposal for round {} as it is too soon after the last proposed block timestamp {}; min round delay is {}ms",
519                    clock_round,
520                    self.last_proposed_timestamp_ms(),
521                    self.context.parameters.min_round_delay.as_millis(),
522                );
523                return None;
524            }
525        }
526
527        // Determine the ancestors to be included in proposal.
528        let (ancestors, excluded_and_equivocating_ancestors) =
529            self.smart_ancestors_to_propose(clock_round, !force);
530
531        // If we did not find enough good ancestors to propose, continue to wait before proposing.
532        if ancestors.is_empty() {
533            assert!(
534                !force,
535                "Ancestors should have been returned if force is true!"
536            );
537            debug!(
538                "Skipping block proposal for round {} because no good ancestor is found",
539                clock_round,
540            );
541            return None;
542        }
543
544        let excluded_ancestors_limit = self.context.committee.size() * 2;
545        if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit {
546            debug!(
547                "Dropping {} excluded ancestor(s) during proposal due to size limit",
548                excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit,
549            );
550        }
551        let excluded_ancestors = excluded_and_equivocating_ancestors
552            .into_iter()
553            .take(excluded_ancestors_limit)
554            .collect();
555
556        // Update the last included ancestor block refs
557        for ancestor in &ancestors {
558            self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
559        }
560
561        let leader_authority = &self
562            .context
563            .committee
564            .authority(self.first_leader(quorum_round))
565            .hostname;
566        self.context
567            .metrics
568            .node_metrics
569            .block_proposal_leader_wait_ms
570            .with_label_values(&[leader_authority])
571            .inc_by(
572                Instant::now()
573                    .saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
574                    .as_millis() as u64,
575            );
576        self.context
577            .metrics
578            .node_metrics
579            .block_proposal_leader_wait_count
580            .with_label_values(&[leader_authority])
581            .inc();
582
583        self.context
584            .metrics
585            .node_metrics
586            .proposed_block_ancestors
587            .observe(ancestors.len() as f64);
588        for ancestor in &ancestors {
589            let authority = &self.context.committee.authority(ancestor.author()).hostname;
590            self.context
591                .metrics
592                .node_metrics
593                .proposed_block_ancestors_depth
594                .with_label_values(&[authority])
595                .observe(clock_round.saturating_sub(ancestor.round()).into());
596        }
597
598        let now = self.context.clock.timestamp_utc_ms();
599        ancestors.iter().for_each(|block| {
600            if block.timestamp_ms() > now {
601                trace!("Ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.", block, block.timestamp_ms(), clock_round);
602                let authority = &self.context.committee.authority(block.author()).hostname;
603                self.context
604                    .metrics
605                    .node_metrics
606                    .proposed_block_ancestors_timestamp_drift_ms
607                    .with_label_values(&[authority])
608                    .inc_by(block.timestamp_ms().saturating_sub(now));
609            }
610        });
611
612        // Consume the next transactions to be included. Do not drop the guards yet as this would acknowledge
613        // the inclusion of transactions. Just let this be done in the end of the method.
614        let (transactions, ack_transactions, _limit_reached) = self.transaction_consumer.next();
615        self.context
616            .metrics
617            .node_metrics
618            .proposed_block_transactions
619            .observe(transactions.len() as f64);
620
621        // Consume the commit votes to be included.
622        let commit_votes = self
623            .dag_state
624            .write()
625            .take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK);
626
627        let transaction_votes = if self.context.protocol_config.mysticeti_fastpath() {
628            let new_causal_history = {
629                let mut dag_state = self.dag_state.write();
630                ancestors
631                    .iter()
632                    .flat_map(|ancestor| dag_state.link_causal_history(ancestor.reference()))
633                    .collect()
634            };
635            self.transaction_certifier.get_own_votes(new_causal_history)
636        } else {
637            vec![]
638        };
639
640        // Create the block and insert to storage.
641        let block = if self.context.protocol_config.mysticeti_fastpath() {
642            Block::V2(BlockV2::new(
643                self.context.committee.epoch(),
644                clock_round,
645                self.context.own_index,
646                now,
647                ancestors.iter().map(|b| b.reference()).collect(),
648                transactions,
649                commit_votes,
650                transaction_votes,
651                vec![],
652            ))
653        } else {
654            Block::V1(BlockV1::new(
655                self.context.committee.epoch(),
656                clock_round,
657                self.context.own_index,
658                now,
659                ancestors.iter().map(|b| b.reference()).collect(),
660                transactions,
661                commit_votes,
662                vec![],
663            ))
664        };
665        let signed_block =
666            SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
667        let serialized = signed_block
668            .serialize()
669            .expect("Block serialization failed.");
670        self.context
671            .metrics
672            .node_metrics
673            .proposed_block_size
674            .observe(serialized.len() as f64);
675        // Own blocks are assumed to be valid.
676        let verified_block = VerifiedBlock::new_verified(signed_block, serialized);
677
678        // Record the interval from last proposal, before accepting the proposed block.
679        let last_proposed_block = self.last_proposed_block();
680        if last_proposed_block.round() > 0 {
681            self.context
682                .metrics
683                .node_metrics
684                .block_proposal_interval
685                .observe(
686                    Duration::from_millis(
687                        verified_block
688                            .timestamp_ms()
689                            .saturating_sub(last_proposed_block.timestamp_ms()),
690                    )
691                    .as_secs_f64(),
692                );
693        }
694
695        // Accept the block into BlockManager and DagState.
696        let (accepted_blocks, missing) = self
697            .block_manager
698            .try_accept_blocks(vec![verified_block.clone()]);
699        assert_eq!(accepted_blocks.len(), 1);
700        assert!(missing.is_empty());
701
702        // The block must be added to transaction certifier before it is broadcasted or added to DagState.
703        // Update proposed state of blocks in local DAG.
704        // TODO(fastpath): move this logic and the logic afterwards to proposed block handler.
705        if self.context.protocol_config.mysticeti_fastpath() {
706            self.transaction_certifier
707                .add_voted_blocks(vec![(verified_block.clone(), vec![])]);
708            self.dag_state
709                .write()
710                .link_causal_history(verified_block.reference());
711        }
712
713        // Ensure the new block and its ancestors are persisted, before broadcasting it.
714        self.dag_state.write().flush();
715
716        // Now acknowledge the transactions for their inclusion to block
717        ack_transactions(verified_block.reference());
718
719        info!("Created block {verified_block:?} for round {clock_round}");
720
721        self.context
722            .metrics
723            .node_metrics
724            .proposed_blocks
725            .with_label_values(&[&force.to_string()])
726            .inc();
727
728        let extended_block = ExtendedBlock {
729            block: verified_block,
730            excluded_ancestors,
731        };
732
733        // Update round tracker with our own highest accepted blocks
734        self.round_tracker
735            .write()
736            .update_from_verified_block(&extended_block);
737
738        Some(extended_block)
739    }
740
741    /// Runs commit rule to attempt to commit additional blocks from the DAG. If any `certified_commits` are provided, then
742    /// it will attempt to commit those first before trying to commit any further leaders.
743    fn try_commit(
744        &mut self,
745        mut certified_commits: Vec<CertifiedCommit>,
746    ) -> ConsensusResult<Vec<CommittedSubDag>> {
747        let _s = self
748            .context
749            .metrics
750            .node_metrics
751            .scope_processing_time
752            .with_label_values(&["Core::try_commit"])
753            .start_timer();
754
755        let mut certified_commits_map = BTreeMap::new();
756        for c in &certified_commits {
757            certified_commits_map.insert(c.index(), c.reference());
758        }
759
760        if !certified_commits.is_empty() {
761            info!(
762                "Processing synced commits: {:?}",
763                certified_commits
764                    .iter()
765                    .map(|c| (c.index(), c.leader()))
766                    .collect::<Vec<_>>()
767            );
768        }
769
770        let mut committed_sub_dags = Vec::new();
771        // TODO: Add optimization to abort early without quorum for a round.
772        loop {
773            // LeaderSchedule has a limit to how many sequenced leaders can be committed
774            // before a change is triggered. Calling into leader schedule will get you
775            // how many commits till next leader change. We will loop back and recalculate
776            // any discarded leaders with the new schedule.
777            let mut commits_until_update = self
778                .leader_schedule
779                .commits_until_leader_schedule_update(self.dag_state.clone());
780
781            if commits_until_update == 0 {
782                let last_commit_index = self.dag_state.read().last_commit_index();
783
784                tracing::info!(
785                    "Leader schedule change triggered at commit index {last_commit_index}"
786                );
787
788                self.leader_schedule
789                    .update_leader_schedule_v2(&self.dag_state);
790
791                let propagation_scores = self
792                    .leader_schedule
793                    .leader_swap_table
794                    .read()
795                    .reputation_scores
796                    .clone();
797                self.ancestor_state_manager
798                    .set_propagation_scores(propagation_scores);
799
800                commits_until_update = self
801                    .leader_schedule
802                    .commits_until_leader_schedule_update(self.dag_state.clone());
803
804                fail_point!("consensus-after-leader-schedule-change");
805            }
806            assert!(commits_until_update > 0);
807
808            // If there are certified commits to process, find out which leaders and commits from them
809            // are decided and use them as the next commits.
810            let (certified_leaders, decided_certified_commits): (
811                Vec<DecidedLeader>,
812                Vec<CertifiedCommit>,
813            ) = self
814                .try_select_certified_leaders(&mut certified_commits, commits_until_update)
815                .into_iter()
816                .unzip();
817
818            // Only accept blocks for the certified commits that we are certain to sequence.
819            // This ensures that only blocks corresponding to committed certified commits are flushed to disk.
820            // Blocks from non-committed certified commits will not be flushed, preventing issues during crash-recovery.
821            // This avoids scenarios where accepting and flushing blocks of non-committed certified commits could lead to
822            // premature commit rule execution. Due to GC, this could cause a panic if the commit rule tries to access
823            // missing causal history from blocks of certified commits.
824            let blocks = decided_certified_commits
825                .iter()
826                .flat_map(|c| c.blocks())
827                .cloned()
828                .collect::<Vec<_>>();
829            self.block_manager.try_accept_committed_blocks(blocks);
830
831            // If there is no certified commit to process, run the decision rule.
832            let (decided_leaders, local) = if certified_leaders.is_empty() {
833                // TODO: limit commits by commits_until_update for efficiency, which may be needed when leader schedule length is reduced.
834                let mut decided_leaders = self.committer.try_decide(self.last_decided_leader);
835                // Truncate the decided leaders to fit the commit schedule limit.
836                if decided_leaders.len() >= commits_until_update {
837                    let _ = decided_leaders.split_off(commits_until_update);
838                }
839                (decided_leaders, true)
840            } else {
841                (certified_leaders, false)
842            };
843
844            // If the decided leaders list is empty then just break the loop.
845            let Some(last_decided) = decided_leaders.last().cloned() else {
846                break;
847            };
848
849            self.last_decided_leader = last_decided.slot();
850            self.context
851                .metrics
852                .node_metrics
853                .last_decided_leader_round
854                .set(self.last_decided_leader.round as i64);
855
856            let sequenced_leaders = decided_leaders
857                .into_iter()
858                .filter_map(|leader| leader.into_committed_block())
859                .collect::<Vec<_>>();
860            // It's possible to reach this point as the decided leaders might all of them be "Skip" decisions. In this case there is no
861            // leader to commit and we should break the loop.
862            if sequenced_leaders.is_empty() {
863                break;
864            }
865            tracing::info!(
866                "Committing {} leaders: {}; {} commits before next leader schedule change",
867                sequenced_leaders.len(),
868                sequenced_leaders
869                    .iter()
870                    .map(|b| b.reference().to_string())
871                    .join(","),
872                commits_until_update,
873            );
874
875            // TODO: refcount subdags
876            let subdags = self
877                .commit_observer
878                .handle_commit(sequenced_leaders, local)?;
879
880            // Try to unsuspend blocks if gc_round has advanced.
881            self.block_manager
882                .try_unsuspend_blocks_for_latest_gc_round();
883
884            committed_sub_dags.extend(subdags);
885
886            fail_point!("consensus-after-handle-commit");
887        }
888
889        // Sanity check: for commits that have been linearized using the certified commits, ensure that the same sub dag has been committed.
890        for sub_dag in &committed_sub_dags {
891            if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
892                assert_eq!(
893                    commit_ref, sub_dag.commit_ref,
894                    "Certified commit has different reference than the committed sub dag"
895                );
896            }
897        }
898
899        // Notify about our own committed blocks
900        let committed_block_refs = committed_sub_dags
901            .iter()
902            .flat_map(|sub_dag| sub_dag.blocks.iter())
903            .filter_map(|block| {
904                (block.author() == self.context.own_index).then_some(block.reference())
905            })
906            .collect::<Vec<_>>();
907        self.transaction_consumer
908            .notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());
909
910        Ok(committed_sub_dags)
911    }
912
913    pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
914        let _scope = monitored_scope("Core::get_missing_blocks");
915        self.block_manager.missing_blocks()
916    }
917
918    /// Sets the delay by round for propagating blocks to a quorum.
919    pub(crate) fn set_propagation_delay(&mut self, delay: Round) {
920        info!("Propagation round delay set to: {delay}");
921        self.propagation_delay = delay;
922    }
923
924    /// Sets the min propose round for the proposer allowing to propose blocks only for round numbers
925    /// `> last_known_proposed_round`. At the moment is allowed to call the method only once leading to a panic
926    /// if attempt to do multiple times.
927    pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
928        if self.last_known_proposed_round.is_some() {
929            panic!(
930                "Should not attempt to set the last known proposed round if that has been already set"
931            );
932        }
933        self.last_known_proposed_round = Some(round);
934        info!("Last known proposed round set to {round}");
935    }
936
937    /// Whether the core should propose new blocks.
938    pub(crate) fn should_propose(&self) -> bool {
939        let clock_round = self.dag_state.read().threshold_clock_round();
940        let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;
941
942        if self.propagation_delay
943            > self
944                .context
945                .parameters
946                .propagation_delay_stop_proposal_threshold
947        {
948            debug!(
949                "Skip proposing for round {clock_round}, high propagation delay {} > {}.",
950                self.propagation_delay,
951                self.context
952                    .parameters
953                    .propagation_delay_stop_proposal_threshold
954            );
955            core_skipped_proposals
956                .with_label_values(&["high_propagation_delay"])
957                .inc();
958            return false;
959        }
960
961        let Some(last_known_proposed_round) = self.last_known_proposed_round else {
962            debug!(
963                "Skip proposing for round {clock_round}, last known proposed round has not been synced yet."
964            );
965            core_skipped_proposals
966                .with_label_values(&["no_last_known_proposed_round"])
967                .inc();
968            return false;
969        };
970        if clock_round <= last_known_proposed_round {
971            debug!(
972                "Skip proposing for round {clock_round} as last known proposed round is {last_known_proposed_round}"
973            );
974            core_skipped_proposals
975                .with_label_values(&["higher_last_known_proposed_round"])
976                .inc();
977            return false;
978        }
979
980        true
981    }
982
983    // Tries to select a prefix of certified commits to be committed next respecting the `limit`.
984    // If provided `limit` is zero, it will panic.
985    // The function returns a list of certified leaders and certified commits. If empty vector is returned, it means that
986    // there are no certified commits to be committed, as input `certified_commits` is either empty or all of the certified
987    // commits have been already committed.
988    #[tracing::instrument(skip_all)]
989    fn try_select_certified_leaders(
990        &mut self,
991        certified_commits: &mut Vec<CertifiedCommit>,
992        limit: usize,
993    ) -> Vec<(DecidedLeader, CertifiedCommit)> {
994        assert!(limit > 0, "limit should be greater than 0");
995        if certified_commits.is_empty() {
996            return vec![];
997        }
998
999        let to_commit = if certified_commits.len() >= limit {
1000            // We keep only the number of leaders as dictated by the `limit`
1001            certified_commits.drain(..limit).collect::<Vec<_>>()
1002        } else {
1003            // Otherwise just take all of them and leave the `synced_commits` empty.
1004            std::mem::take(certified_commits)
1005        };
1006
1007        tracing::debug!(
1008            "Selected {} certified leaders: {}",
1009            to_commit.len(),
1010            to_commit.iter().map(|c| c.leader().to_string()).join(",")
1011        );
1012
1013        to_commit
1014            .into_iter()
1015            .map(|commit| {
1016                let leader = commit.blocks().last().expect("Certified commit should have at least one block");
1017                assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
1018                // There is no knowledge of direct commit with certified commits, so assuming indirect commit.
1019                let leader = DecidedLeader::Commit(leader.clone(), /* direct */ false);
1020                UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
1021                (leader, commit)
1022            })
1023            .collect::<Vec<_>>()
1024    }
1025
1026    /// Retrieves the next ancestors to propose to form a block at `clock_round` round.
1027    /// If smart selection is enabled then this will try to select the best ancestors
1028    /// based on the propagation scores of the authorities.
1029    fn smart_ancestors_to_propose(
1030        &mut self,
1031        clock_round: Round,
1032        smart_select: bool,
1033    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
1034        let node_metrics = &self.context.metrics.node_metrics;
1035        let _s = node_metrics
1036            .scope_processing_time
1037            .with_label_values(&["Core::smart_ancestors_to_propose"])
1038            .start_timer();
1039
1040        // Now take the ancestors before the clock_round (excluded) for each authority.
1041        let all_ancestors = self
1042            .dag_state
1043            .read()
1044            .get_last_cached_block_per_authority(clock_round);
1045
1046        assert_eq!(
1047            all_ancestors.len(),
1048            self.context.committee.size(),
1049            "Fatal error, number of returned ancestors don't match committee size."
1050        );
1051
1052        // Ensure ancestor state is up to date before selecting for proposal.
1053        let accepted_quorum_rounds = self.round_tracker.read().compute_accepted_quorum_rounds();
1054
1055        self.ancestor_state_manager
1056            .update_all_ancestors_state(&accepted_quorum_rounds);
1057
1058        let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();
1059
1060        let quorum_round = clock_round.saturating_sub(1);
1061
1062        let mut score_and_pending_excluded_ancestors = Vec::new();
1063        let mut excluded_and_equivocating_ancestors = BTreeSet::new();
1064
1065        // Propose only ancestors of higher rounds than what has already been proposed.
1066        // And always include own last proposed block first among ancestors.
1067        // Start by only including the high scoring ancestors. Low scoring ancestors
1068        // will be included in a second pass below.
1069        let included_ancestors = iter::once(self.last_proposed_block().clone())
1070            .chain(
1071                all_ancestors
1072                    .into_iter()
1073                    .flat_map(|(ancestor, equivocating_ancestors)| {
1074                        if ancestor.author() == self.context.own_index {
1075                            return None;
1076                        }
1077                        if let Some(last_block_ref) =
1078                            self.last_included_ancestors[ancestor.author()]
1079                            && last_block_ref.round >= ancestor.round() {
1080                                return None;
1081                            }
1082
1083                        // We will never include equivocating ancestors so add them immediately
1084                        excluded_and_equivocating_ancestors.extend(equivocating_ancestors);
1085
1086                        let ancestor_state = ancestor_state_map[ancestor.author()];
1087                        match ancestor_state {
1088                            AncestorState::Include => {
1089                                trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
1090                            }
1091                            AncestorState::Exclude(score) => {
1092                                trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
1093                                score_and_pending_excluded_ancestors.push((score, ancestor));
1094                                return None;
1095                            }
1096                        }
1097
1098                        Some(ancestor)
1099                    }),
1100            )
1101            .collect::<Vec<_>>();
1102
1103        let mut parent_round_quorum = StakeAggregator::<QuorumThreshold>::new();
1104
1105        // Check total stake of high scoring parent round ancestors
1106        for ancestor in included_ancestors
1107            .iter()
1108            .filter(|a| a.round() == quorum_round)
1109        {
1110            parent_round_quorum.add(ancestor.author(), &self.context.committee);
1111        }
1112
1113        if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
1114            node_metrics.smart_selection_wait.inc();
1115            debug!(
1116                "Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.",
1117                parent_round_quorum.stake()
1118            );
1119            return (vec![], BTreeSet::new());
1120        }
1121
1122        // Sort scores descending so we can include the best of the pending excluded
1123        // ancestors first until we reach the threshold.
1124        score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0));
1125
1126        let mut ancestors_to_propose = included_ancestors;
1127        let mut excluded_ancestors = Vec::new();
1128        for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() {
1129            let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
1130            if !parent_round_quorum.reached_threshold(&self.context.committee)
1131                && ancestor.round() == quorum_round
1132            {
1133                debug!(
1134                    "Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"
1135                );
1136                parent_round_quorum.add(ancestor.author(), &self.context.committee);
1137                ancestors_to_propose.push(ancestor);
1138                node_metrics
1139                    .included_excluded_proposal_ancestors_count_by_authority
1140                    .with_label_values(&[block_hostname, "timeout"])
1141                    .inc();
1142            } else {
1143                excluded_ancestors.push((score, ancestor));
1144            }
1145        }
1146
1147        // Iterate through excluded ancestors and include the ancestor or the ancestor's ancestor
1148        // that has been accepted by a quorum of the network. If the original ancestor itself
1149        // is not included then it will be part of excluded ancestors that are not
1150        // included in the block but will still be broadcasted to peers.
1151        for (score, ancestor) in excluded_ancestors.iter() {
1152            let excluded_author = ancestor.author();
1153            let block_hostname = &self.context.committee.authority(excluded_author).hostname;
1154            // A quorum of validators reported to have accepted blocks from the excluded_author up to the low quorum round.
1155            let mut accepted_low_quorum_round = accepted_quorum_rounds[excluded_author].0;
1156            // If the accepted quorum round of this ancestor is greater than or equal
1157            // to the clock round then we want to make sure to set it to clock_round - 1
1158            // as that is the max round the new block can include as an ancestor.
1159            accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);
1160
1161            let last_included_round = self.last_included_ancestors[excluded_author]
1162                .map(|block_ref| block_ref.round)
1163                .unwrap_or(GENESIS_ROUND);
1164            if ancestor.round() <= last_included_round {
1165                // This should have already been filtered out when filtering all_ancestors.
1166                // Still, ensure previously included ancestors are filtered out.
1167                continue;
1168            }
1169
1170            if last_included_round >= accepted_low_quorum_round {
1171                excluded_and_equivocating_ancestors.insert(ancestor.reference());
1172                trace!(
1173                    "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {last_included_round} >= accepted low quorum round {accepted_low_quorum_round}",
1174                    ancestor.reference()
1175                );
1176                node_metrics
1177                    .excluded_proposal_ancestors_count_by_authority
1178                    .with_label_values(&[block_hostname])
1179                    .inc();
1180                continue;
1181            }
1182
1183            let ancestor = if ancestor.round() <= accepted_low_quorum_round {
1184                // Include the ancestor block as it has been seen & accepted by a strong quorum.
1185                ancestor.clone()
1186            } else {
1187                // Exclude this ancestor since it hasn't been accepted by a strong quorum
1188                excluded_and_equivocating_ancestors.insert(ancestor.reference());
1189                trace!(
1190                    "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: ancestor round {} > accepted low quorum round {accepted_low_quorum_round} ",
1191                    ancestor.reference(),
1192                    ancestor.round()
1193                );
1194                node_metrics
1195                    .excluded_proposal_ancestors_count_by_authority
1196                    .with_label_values(&[block_hostname])
1197                    .inc();
1198
1199                // Look for an earlier block in the ancestor chain that we can include as there
1200                // is a gap between the last included round and the accepted low quorum round.
1201                //
1202                // Note: Only cached blocks need to be propagated. Committed and GC'ed blocks
1203                // do not need to be propagated.
1204                match self.dag_state.read().get_last_cached_block_in_range(
1205                    excluded_author,
1206                    last_included_round + 1,
1207                    accepted_low_quorum_round + 1,
1208                ) {
1209                    Some(earlier_ancestor) => {
1210                        // Found an earlier block that has been propagated well - include it instead
1211                        earlier_ancestor
1212                    }
1213                    None => {
1214                        // No suitable earlier block found
1215                        continue;
1216                    }
1217                }
1218            };
1219            self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
1220            ancestors_to_propose.push(ancestor.clone());
1221            trace!(
1222                "Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}",
1223                ancestor.reference()
1224            );
1225            node_metrics
1226                .included_excluded_proposal_ancestors_count_by_authority
1227                .with_label_values(&[block_hostname, "quorum"])
1228                .inc();
1229        }
1230
1231        assert!(
1232            parent_round_quorum.reached_threshold(&self.context.committee),
1233            "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1234        );
1235
1236        debug!(
1237            "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}",
1238            ancestors_to_propose.len(),
1239            excluded_and_equivocating_ancestors.len()
1240        );
1241
1242        (ancestors_to_propose, excluded_and_equivocating_ancestors)
1243    }
1244
1245    /// Checks whether all the leaders of the round exist.
1246    /// TODO: we can leverage some additional signal here in order to more cleverly manipulate later the leader timeout
1247    /// Ex if we already have one leader - the first in order - we might don't want to wait as much.
1248    fn leaders_exist(&self, round: Round) -> bool {
1249        let dag_state = self.dag_state.read();
1250        for leader in self.leaders(round) {
1251            // Search for all the leaders. If at least one is not found, then return false.
1252            // A linear search should be fine here as the set of elements is not expected to be small enough and more sophisticated
1253            // data structures might not give us much here.
1254            if !dag_state.contains_cached_block_at_slot(leader) {
1255                return false;
1256            }
1257        }
1258
1259        true
1260    }
1261
1262    /// Returns the leaders of the provided round.
1263    fn leaders(&self, round: Round) -> Vec<Slot> {
1264        self.committer
1265            .get_leaders(round)
1266            .into_iter()
1267            .map(|authority_index| Slot::new(round, authority_index))
1268            .collect()
1269    }
1270
1271    /// Returns the 1st leader of the round.
1272    fn first_leader(&self, round: Round) -> AuthorityIndex {
1273        self.leaders(round).first().unwrap().authority
1274    }
1275
1276    fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
1277        self.last_proposed_block().timestamp_ms()
1278    }
1279
1280    fn last_proposed_round(&self) -> Round {
1281        self.last_proposed_block().round()
1282    }
1283
1284    fn last_proposed_block(&self) -> VerifiedBlock {
1285        self.dag_state.read().get_last_proposed_block()
1286    }
1287}
1288
1289/// Senders of signals from Core, for outputs and events (ex new block produced).
1290pub(crate) struct CoreSignals {
1291    tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
1292    new_round_sender: watch::Sender<Round>,
1293    context: Arc<Context>,
1294}
1295
1296impl CoreSignals {
1297    pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
1298        // Blocks buffered in broadcast channel should be roughly equal to thosed cached in dag state,
1299        // since the underlying blocks are ref counted so a lower buffer here will not reduce memory
1300        // usage significantly.
1301        let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
1302            context.parameters.dag_state_cached_rounds as usize,
1303        );
1304        let (new_round_sender, new_round_receiver) = watch::channel(0);
1305
1306        let me = Self {
1307            tx_block_broadcast,
1308            new_round_sender,
1309            context,
1310        };
1311
1312        let receivers = CoreSignalsReceivers {
1313            rx_block_broadcast,
1314            new_round_receiver,
1315        };
1316
1317        (me, receivers)
1318    }
1319
1320    /// Sends a signal to all the waiters that a new block has been produced. The method will return
1321    /// true if block has reached even one subscriber, false otherwise.
1322    pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
1323        // When there is only one authority in committee, it is unnecessary to broadcast
1324        // the block which will fail anyway without subscribers to the signal.
1325        if self.context.committee.size() > 1 {
1326            if extended_block.block.round() == GENESIS_ROUND {
1327                debug!("Ignoring broadcasting genesis block to peers");
1328                return Ok(());
1329            }
1330
1331            if let Err(err) = self.tx_block_broadcast.send(extended_block) {
1332                warn!("Couldn't broadcast the block to any receiver: {err}");
1333                return Err(ConsensusError::Shutdown);
1334            }
1335        } else {
1336            debug!(
1337                "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
1338            );
1339        }
1340        Ok(())
1341    }
1342
1343    /// Sends a signal that threshold clock has advanced to new round. The `round_number` is the round at which the
1344    /// threshold clock has advanced to.
1345    pub(crate) fn new_round(&mut self, round_number: Round) {
1346        let _ = self.new_round_sender.send_replace(round_number);
1347    }
1348}
1349
1350/// Receivers of signals from Core.
1351/// Intentionally un-clonable. Comonents should only subscribe to channels they need.
1352pub(crate) struct CoreSignalsReceivers {
1353    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
1354    new_round_receiver: watch::Receiver<Round>,
1355}
1356
1357impl CoreSignalsReceivers {
1358    pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
1359        self.rx_block_broadcast.resubscribe()
1360    }
1361
1362    pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
1363        self.new_round_receiver.clone()
1364    }
1365}
1366
1367/// Creates cores for the specified number of authorities for their corresponding stakes. The method returns the
1368/// cores and their respective signal receivers are returned in `AuthorityIndex` order asc.
1369#[cfg(test)]
1370pub(crate) async fn create_cores(
1371    context: Context,
1372    authorities: Vec<Stake>,
1373) -> Vec<CoreTextFixture> {
1374    let mut cores = Vec::new();
1375
1376    for index in 0..authorities.len() {
1377        let own_index = AuthorityIndex::new_for_test(index as u32);
1378        let core =
1379            CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false).await;
1380        cores.push(core);
1381    }
1382    cores
1383}
1384
1385#[cfg(test)]
1386pub(crate) struct CoreTextFixture {
1387    pub(crate) core: Core,
1388    pub(crate) transaction_certifier: TransactionCertifier,
1389    pub(crate) signal_receivers: CoreSignalsReceivers,
1390    pub(crate) block_receiver: broadcast::Receiver<ExtendedBlock>,
1391    pub(crate) _commit_output_receiver: UnboundedReceiver<CommittedSubDag>,
1392    pub(crate) dag_state: Arc<RwLock<DagState>>,
1393    pub(crate) store: Arc<MemStore>,
1394}
1395
1396#[cfg(test)]
1397impl CoreTextFixture {
1398    async fn new(
1399        context: Context,
1400        authorities: Vec<Stake>,
1401        own_index: AuthorityIndex,
1402        sync_last_known_own_block: bool,
1403    ) -> Self {
1404        let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
1405        let mut context = context.clone();
1406        context = context
1407            .with_committee(committee)
1408            .with_authority_index(own_index);
1409        context
1410            .protocol_config
1411            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1412
1413        let context = Arc::new(context);
1414        let store = Arc::new(MemStore::new());
1415        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1416
1417        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1418        let leader_schedule = Arc::new(
1419            LeaderSchedule::from_store(context.clone(), dag_state.clone())
1420                .with_num_commits_per_schedule(10),
1421        );
1422        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1423        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1424        let (blocks_sender, _blocks_receiver) =
1425            mysten_metrics::monitored_mpsc::unbounded_channel("consensus_block_output");
1426        let transaction_certifier = TransactionCertifier::new(
1427            context.clone(),
1428            Arc::new(NoopBlockVerifier {}),
1429            dag_state.clone(),
1430            blocks_sender,
1431        );
1432        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1433        // Need at least one subscriber to the block broadcast channel.
1434        let block_receiver = signal_receivers.block_broadcast_receiver();
1435
1436        let (commit_consumer, commit_output_receiver) = CommitConsumerArgs::new(0, 0);
1437        let commit_observer = CommitObserver::new(
1438            context.clone(),
1439            commit_consumer,
1440            dag_state.clone(),
1441            transaction_certifier.clone(),
1442            leader_schedule.clone(),
1443        )
1444        .await;
1445
1446        let block_signer = signers.remove(own_index.value()).1;
1447
1448        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1449        let core = Core::new(
1450            context,
1451            leader_schedule,
1452            transaction_consumer,
1453            transaction_certifier.clone(),
1454            block_manager,
1455            commit_observer,
1456            signals,
1457            block_signer,
1458            dag_state.clone(),
1459            sync_last_known_own_block,
1460            round_tracker,
1461        );
1462
1463        Self {
1464            core,
1465            transaction_certifier,
1466            signal_receivers,
1467            block_receiver,
1468            _commit_output_receiver: commit_output_receiver,
1469            dag_state,
1470            store,
1471        }
1472    }
1473
1474    pub(crate) fn add_blocks(
1475        &mut self,
1476        blocks: Vec<VerifiedBlock>,
1477    ) -> ConsensusResult<BTreeSet<BlockRef>> {
1478        self.transaction_certifier
1479            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
1480        self.core.add_blocks(blocks)
1481    }
1482}
1483
1484#[cfg(test)]
1485mod test {
1486    use std::{collections::BTreeSet, time::Duration};
1487
1488    use consensus_config::{AuthorityIndex, Parameters};
1489    use consensus_types::block::TransactionIndex;
1490    use futures::{StreamExt, stream::FuturesUnordered};
1491    use mysten_metrics::monitored_mpsc;
1492    use sui_protocol_config::ProtocolConfig;
1493    use tokio::time::sleep;
1494
1495    use super::*;
1496    use crate::{
1497        CommitConsumerArgs, CommitIndex,
1498        block::{TestBlock, genesis_blocks},
1499        block_verifier::NoopBlockVerifier,
1500        commit::CommitAPI,
1501        leader_scoring::ReputationScores,
1502        storage::{Store, WriteBatch, mem_store::MemStore},
1503        test_dag_builder::DagBuilder,
1504        test_dag_parser::parse_dag,
1505        transaction::{BlockStatus, TransactionClient},
1506    };
1507
1508    /// Recover Core and continue proposing from the last round which forms a quorum.
1509    #[tokio::test]
1510    async fn test_core_recover_from_store_for_full_round() {
1511        telemetry_subscribers::init_for_testing();
1512        let (context, mut key_pairs) = Context::new_for_test(4);
1513        let context = Arc::new(context);
1514        let store = Arc::new(MemStore::new());
1515        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1516        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1517        let mut block_status_subscriptions = FuturesUnordered::new();
1518
1519        // Create test blocks for all the authorities for 4 rounds and populate them in store
1520        let mut last_round_blocks = genesis_blocks(&context);
1521        let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1522        for round in 1..=4 {
1523            let mut this_round_blocks = Vec::new();
1524            for (index, _authority) in context.committee.authorities() {
1525                let block = VerifiedBlock::new_for_test(
1526                    TestBlock::new(round, index.value() as u32)
1527                        .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1528                        .build(),
1529                );
1530
1531                // If it's round 1, that one will be committed later on, and it's our "own" block, then subscribe to listen for the block status.
1532                if round == 1 && index == context.own_index {
1533                    let subscription =
1534                        transaction_consumer.subscribe_for_block_status_testing(block.reference());
1535                    block_status_subscriptions.push(subscription);
1536                }
1537
1538                this_round_blocks.push(block);
1539            }
1540            all_blocks.extend(this_round_blocks.clone());
1541            last_round_blocks = this_round_blocks;
1542        }
1543        // write them in store
1544        store
1545            .write(WriteBatch::default().blocks(all_blocks))
1546            .expect("Storage error");
1547
1548        // create dag state after all blocks have been written to store
1549        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1550        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1551        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1552            context.clone(),
1553            dag_state.clone(),
1554        ));
1555        let (blocks_sender, _blocks_receiver) =
1556            monitored_mpsc::unbounded_channel("consensus_block_output");
1557        let transaction_certifier = TransactionCertifier::new(
1558            context.clone(),
1559            Arc::new(NoopBlockVerifier {}),
1560            dag_state.clone(),
1561            blocks_sender,
1562        );
1563
1564        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
1565        let commit_observer = CommitObserver::new(
1566            context.clone(),
1567            commit_consumer,
1568            dag_state.clone(),
1569            transaction_certifier.clone(),
1570            leader_schedule.clone(),
1571        )
1572        .await;
1573
1574        // Check no commits have been persisted to dag_state or store.
1575        let last_commit = store.read_last_commit().unwrap();
1576        assert!(last_commit.is_none());
1577        assert_eq!(dag_state.read().last_commit_index(), 0);
1578
1579        // Now spin up core
1580        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1581        let (blocks_sender, _blocks_receiver) =
1582            monitored_mpsc::unbounded_channel("consensus_block_output");
1583        let transaction_certifier = TransactionCertifier::new(
1584            context.clone(),
1585            Arc::new(NoopBlockVerifier {}),
1586            dag_state.clone(),
1587            blocks_sender,
1588        );
1589        transaction_certifier.recover_blocks_after_round(dag_state.read().gc_round());
1590        // Need at least one subscriber to the block broadcast channel.
1591        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1592        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1593        let _core = Core::new(
1594            context.clone(),
1595            leader_schedule,
1596            transaction_consumer,
1597            transaction_certifier.clone(),
1598            block_manager,
1599            commit_observer,
1600            signals,
1601            key_pairs.remove(context.own_index.value()).1,
1602            dag_state.clone(),
1603            false,
1604            round_tracker,
1605        );
1606
1607        // New round should be 5
1608        let mut new_round = signal_receivers.new_round_receiver();
1609        assert_eq!(*new_round.borrow_and_update(), 5);
1610
1611        // Block for round 5 should have been proposed.
1612        let proposed_block = block_receiver
1613            .recv()
1614            .await
1615            .expect("A block should have been created");
1616        assert_eq!(proposed_block.block.round(), 5);
1617        let ancestors = proposed_block.block.ancestors();
1618
1619        // Only ancestors of round 4 should be included.
1620        assert_eq!(ancestors.len(), 4);
1621        for ancestor in ancestors {
1622            assert_eq!(ancestor.round, 4);
1623        }
1624
1625        // Flush the DAG state to storage.
1626        dag_state.write().flush();
1627
1628        // There were no commits prior to the core starting up but there was completed
1629        // rounds up to and including round 4. So we should commit leaders in round 1 & 2
1630        // as soon as the new block for round 5 is proposed.
1631        let last_commit = store
1632            .read_last_commit()
1633            .unwrap()
1634            .expect("last commit should be set");
1635        assert_eq!(last_commit.index(), 2);
1636        assert_eq!(dag_state.read().last_commit_index(), 2);
1637        let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1638        assert_eq!(all_stored_commits.len(), 2);
1639
1640        // And ensure that our "own" block 1 sent to TransactionConsumer as notification alongside with gc_round
1641        while let Some(result) = block_status_subscriptions.next().await {
1642            let status = result.unwrap();
1643            assert!(matches!(status, BlockStatus::Sequenced(_)));
1644        }
1645    }
1646
1647    /// Recover Core and continue proposing when having a partial last round which doesn't form a quorum and we haven't
1648    /// proposed for that round yet.
1649    #[tokio::test]
1650    async fn test_core_recover_from_store_for_partial_round() {
1651        telemetry_subscribers::init_for_testing();
1652
1653        let (context, mut key_pairs) = Context::new_for_test(4);
1654        let context = Arc::new(context);
1655        let store = Arc::new(MemStore::new());
1656        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1657        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1658
1659        // Create test blocks for all authorities except our's (index = 0).
1660        let mut last_round_blocks = genesis_blocks(&context);
1661        let mut all_blocks = last_round_blocks.clone();
1662        for round in 1..=4 {
1663            let mut this_round_blocks = Vec::new();
1664
1665            // For round 4 only produce f+1 blocks. Skip our validator 0 and that of position 1 from creating blocks.
1666            let authorities_to_skip = if round == 4 {
1667                context.committee.validity_threshold() as usize
1668            } else {
1669                // otherwise always skip creating a block for our authority
1670                1
1671            };
1672
1673            for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1674                let block = TestBlock::new(round, index.value() as u32)
1675                    .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1676                    .build();
1677                this_round_blocks.push(VerifiedBlock::new_for_test(block));
1678            }
1679            all_blocks.extend(this_round_blocks.clone());
1680            last_round_blocks = this_round_blocks;
1681        }
1682
1683        // write them in store
1684        store
1685            .write(WriteBatch::default().blocks(all_blocks))
1686            .expect("Storage error");
1687
1688        // create dag state after all blocks have been written to store
1689        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1690        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1691        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1692            context.clone(),
1693            dag_state.clone(),
1694        ));
1695        let (blocks_sender, _blocks_receiver) =
1696            monitored_mpsc::unbounded_channel("consensus_block_output");
1697        let transaction_certifier = TransactionCertifier::new(
1698            context.clone(),
1699            Arc::new(NoopBlockVerifier {}),
1700            dag_state.clone(),
1701            blocks_sender,
1702        );
1703
1704        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
1705        let commit_observer = CommitObserver::new(
1706            context.clone(),
1707            commit_consumer,
1708            dag_state.clone(),
1709            transaction_certifier.clone(),
1710            leader_schedule.clone(),
1711        )
1712        .await;
1713
1714        // Check no commits have been persisted to dag_state & store
1715        let last_commit = store.read_last_commit().unwrap();
1716        assert!(last_commit.is_none());
1717        assert_eq!(dag_state.read().last_commit_index(), 0);
1718
1719        // Now spin up core
1720        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1721        let (blocks_sender, _blocks_receiver) =
1722            monitored_mpsc::unbounded_channel("consensus_block_output");
1723        let transaction_certifier = TransactionCertifier::new(
1724            context.clone(),
1725            Arc::new(NoopBlockVerifier {}),
1726            dag_state.clone(),
1727            blocks_sender,
1728        );
1729        transaction_certifier.recover_blocks_after_round(dag_state.read().gc_round());
1730        // Need at least one subscriber to the block broadcast channel.
1731        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1732        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1733        let mut core = Core::new(
1734            context.clone(),
1735            leader_schedule,
1736            transaction_consumer,
1737            transaction_certifier,
1738            block_manager,
1739            commit_observer,
1740            signals,
1741            key_pairs.remove(context.own_index.value()).1,
1742            dag_state.clone(),
1743            false,
1744            round_tracker,
1745        );
1746
1747        // Clock round should have advanced to 5 during recovery because
1748        // a quorum has formed in round 4.
1749        let mut new_round = signal_receivers.new_round_receiver();
1750        assert_eq!(*new_round.borrow_and_update(), 5);
1751
1752        // During recovery, round 4 block should have been proposed.
1753        let proposed_block = block_receiver
1754            .recv()
1755            .await
1756            .expect("A block should have been created");
1757        assert_eq!(proposed_block.block.round(), 4);
1758        let ancestors = proposed_block.block.ancestors();
1759
1760        assert_eq!(ancestors.len(), 4);
1761        for ancestor in ancestors {
1762            if ancestor.author == context.own_index {
1763                assert_eq!(ancestor.round, 0);
1764            } else {
1765                assert_eq!(ancestor.round, 3);
1766            }
1767        }
1768
1769        // Run commit rule.
1770        core.try_commit(vec![]).ok();
1771
1772        // Flush the DAG state to storage.
1773        core.dag_state.write().flush();
1774
1775        // There were no commits prior to the core starting up but there was completed
1776        // rounds up to round 4. So we should commit leaders in round 1 & 2 as soon
1777        // as the new block for round 4 is proposed.
1778        let last_commit = store
1779            .read_last_commit()
1780            .unwrap()
1781            .expect("last commit should be set");
1782        assert_eq!(last_commit.index(), 2);
1783        assert_eq!(dag_state.read().last_commit_index(), 2);
1784        let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1785        assert_eq!(all_stored_commits.len(), 2);
1786    }
1787
1788    #[tokio::test]
1789    async fn test_core_propose_after_genesis() {
1790        telemetry_subscribers::init_for_testing();
1791        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1792            config.set_consensus_max_transaction_size_bytes_for_testing(2_000);
1793            config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
1794            config
1795        });
1796
1797        let (context, mut key_pairs) = Context::new_for_test(4);
1798        let context = Arc::new(context);
1799        let store = Arc::new(MemStore::new());
1800        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1801
1802        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1803        let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1804        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1805        let (blocks_sender, _blocks_receiver) =
1806            monitored_mpsc::unbounded_channel("consensus_block_output");
1807        let transaction_certifier = TransactionCertifier::new(
1808            context.clone(),
1809            Arc::new(NoopBlockVerifier {}),
1810            dag_state.clone(),
1811            blocks_sender,
1812        );
1813        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1814        // Need at least one subscriber to the block broadcast channel.
1815        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1816        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1817            context.clone(),
1818            dag_state.clone(),
1819        ));
1820
1821        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
1822        let commit_observer = CommitObserver::new(
1823            context.clone(),
1824            commit_consumer,
1825            dag_state.clone(),
1826            transaction_certifier.clone(),
1827            leader_schedule.clone(),
1828        )
1829        .await;
1830
1831        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1832        let mut core = Core::new(
1833            context.clone(),
1834            leader_schedule,
1835            transaction_consumer,
1836            transaction_certifier,
1837            block_manager,
1838            commit_observer,
1839            signals,
1840            key_pairs.remove(context.own_index.value()).1,
1841            dag_state.clone(),
1842            false,
1843            round_tracker,
1844        );
1845
1846        // Send some transactions
1847        let mut total = 0;
1848        let mut index = 0;
1849        loop {
1850            let transaction =
1851                bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1852            total += transaction.len();
1853            index += 1;
1854            let _w = transaction_client
1855                .submit_no_wait(vec![transaction])
1856                .await
1857                .unwrap();
1858
1859            // Create total size of transactions up to 1KB
1860            if total >= 1_000 {
1861                break;
1862            }
1863        }
1864
1865        // a new block should have been created during recovery.
1866        let extended_block = block_receiver
1867            .recv()
1868            .await
1869            .expect("A new block should have been created");
1870
1871        // A new block created - assert the details
1872        assert_eq!(extended_block.block.round(), 1);
1873        assert_eq!(extended_block.block.author().value(), 0);
1874        assert_eq!(extended_block.block.ancestors().len(), 4);
1875
1876        let mut total = 0;
1877        for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1878            total += transaction.data().len() as u64;
1879            let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1880            assert_eq!(format!("Transaction {i}"), transaction);
1881        }
1882        assert!(total <= context.protocol_config.max_transactions_in_block_bytes());
1883
1884        // genesis blocks should be referenced
1885        let all_genesis = genesis_blocks(&context);
1886
1887        for ancestor in extended_block.block.ancestors() {
1888            all_genesis
1889                .iter()
1890                .find(|block| block.reference() == *ancestor)
1891                .expect("Block should be found amongst genesis blocks");
1892        }
1893
1894        // Try to propose again - with or without ignore leaders check, it will not return any block
1895        assert!(core.try_propose(false).unwrap().is_none());
1896        assert!(core.try_propose(true).unwrap().is_none());
1897
1898        // Flush the DAG state to storage.
1899        dag_state.write().flush();
1900
1901        // Check no commits have been persisted to dag_state & store
1902        let last_commit = store.read_last_commit().unwrap();
1903        assert!(last_commit.is_none());
1904        assert_eq!(dag_state.read().last_commit_index(), 0);
1905    }
1906
1907    #[tokio::test]
1908    async fn test_core_propose_once_receiving_a_quorum() {
1909        telemetry_subscribers::init_for_testing();
1910        let (context, _key_pairs) = Context::new_for_test(4);
1911        let mut core_fixture = CoreTextFixture::new(
1912            context.clone(),
1913            vec![1, 1, 1, 1],
1914            AuthorityIndex::new_for_test(0),
1915            false,
1916        )
1917        .await;
1918        let transaction_certifier = &core_fixture.transaction_certifier;
1919        let store = &core_fixture.store;
1920        let dag_state = &core_fixture.dag_state;
1921        let core = &mut core_fixture.core;
1922
1923        let mut expected_ancestors = BTreeSet::new();
1924
1925        // Adding one block now will trigger the creation of new block for round 1
1926        let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
1927        expected_ancestors.insert(block_1.reference());
1928        // Wait for min round delay to allow blocks to be proposed.
1929        sleep(context.parameters.min_round_delay).await;
1930        // add blocks to trigger proposal.
1931        transaction_certifier.add_voted_blocks(vec![(block_1.clone(), vec![])]);
1932        _ = core.add_blocks(vec![block_1]);
1933
1934        assert_eq!(core.last_proposed_round(), 1);
1935        expected_ancestors.insert(core.last_proposed_block().reference());
1936        // attempt to create a block - none will be produced.
1937        assert!(core.try_propose(false).unwrap().is_none());
1938
1939        // Adding another block now forms a quorum for round 1, so block at round 2 will proposed
1940        let block_2 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
1941        expected_ancestors.insert(block_2.reference());
1942        // Wait for min round delay to allow blocks to be proposed.
1943        sleep(context.parameters.min_round_delay).await;
1944        // add blocks to trigger proposal.
1945        transaction_certifier.add_voted_blocks(vec![(block_2.clone(), vec![1, 4])]);
1946        _ = core.add_blocks(vec![block_2.clone()]);
1947
1948        assert_eq!(core.last_proposed_round(), 2);
1949
1950        let proposed_block = core.last_proposed_block();
1951        assert_eq!(proposed_block.round(), 2);
1952        assert_eq!(proposed_block.author(), context.own_index);
1953        assert_eq!(proposed_block.ancestors().len(), 3);
1954        let ancestors = proposed_block.ancestors();
1955        let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
1956        assert_eq!(ancestors, expected_ancestors);
1957
1958        let transaction_votes = proposed_block.transaction_votes();
1959        assert_eq!(transaction_votes.len(), 1);
1960        let transaction_vote = transaction_votes.first().unwrap();
1961        assert_eq!(transaction_vote.block_ref, block_2.reference());
1962        assert_eq!(transaction_vote.rejects, vec![1, 4]);
1963
1964        // Flush the DAG state to storage.
1965        dag_state.write().flush();
1966
1967        // Check no commits have been persisted to dag_state & store
1968        let last_commit = store.read_last_commit().unwrap();
1969        assert!(last_commit.is_none());
1970        assert_eq!(dag_state.read().last_commit_index(), 0);
1971    }
1972
1973    #[tokio::test]
1974    async fn test_commit_and_notify_for_block_status() {
1975        telemetry_subscribers::init_for_testing();
1976        let (mut context, mut key_pairs) = Context::new_for_test(4);
1977        const GC_DEPTH: u32 = 2;
1978
1979        context
1980            .protocol_config
1981            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1982
1983        let context = Arc::new(context);
1984
1985        let store = Arc::new(MemStore::new());
1986        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1987        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1988        let mut block_status_subscriptions = FuturesUnordered::new();
1989
1990        let dag_str = "DAG {
1991            Round 0 : { 4 },
1992            Round 1 : { * },
1993            Round 2 : { * },
1994            Round 3 : {
1995                A -> [*],
1996                B -> [-A2],
1997                C -> [-A2],
1998                D -> [-A2],
1999            },
2000            Round 4 : { 
2001                B -> [-A3],
2002                C -> [-A3],
2003                D -> [-A3],
2004            },
2005            Round 5 : { 
2006                A -> [A3, B4, C4, D4]
2007                B -> [*],
2008                C -> [*],
2009                D -> [*],
2010            },
2011            Round 6 : { * },
2012            Round 7 : { * },
2013            Round 8 : { * },
2014        }";
2015
2016        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2017        dag_builder.print();
2018
2019        // Subscribe to all created "own" blocks. We know that for our node (A) we'll be able to commit up to round 5.
2020        for block in dag_builder.blocks(1..=5) {
2021            if block.author() == context.own_index {
2022                let subscription =
2023                    transaction_consumer.subscribe_for_block_status_testing(block.reference());
2024                block_status_subscriptions.push(subscription);
2025            }
2026        }
2027
2028        // write them in store
2029        store
2030            .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
2031            .expect("Storage error");
2032
2033        // create dag state after all blocks have been written to store
2034        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2035        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2036        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2037            context.clone(),
2038            dag_state.clone(),
2039        ));
2040        let (blocks_sender, _blocks_receiver) =
2041            monitored_mpsc::unbounded_channel("consensus_block_output");
2042        let transaction_certifier = TransactionCertifier::new(
2043            context.clone(),
2044            Arc::new(NoopBlockVerifier {}),
2045            dag_state.clone(),
2046            blocks_sender,
2047        );
2048
2049        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
2050        let commit_observer = CommitObserver::new(
2051            context.clone(),
2052            commit_consumer,
2053            dag_state.clone(),
2054            transaction_certifier.clone(),
2055            leader_schedule.clone(),
2056        )
2057        .await;
2058
2059        // Flush the DAG state to storage.
2060        dag_state.write().flush();
2061
2062        // Check no commits have been persisted to dag_state or store.
2063        let last_commit = store.read_last_commit().unwrap();
2064        assert!(last_commit.is_none());
2065        assert_eq!(dag_state.read().last_commit_index(), 0);
2066
2067        // Now recover Core and other components.
2068        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2069        let (blocks_sender, _blocks_receiver) =
2070            monitored_mpsc::unbounded_channel("consensus_block_output");
2071        let transaction_certifier = TransactionCertifier::new(
2072            context.clone(),
2073            Arc::new(NoopBlockVerifier {}),
2074            dag_state.clone(),
2075            blocks_sender,
2076        );
2077        transaction_certifier.recover_blocks_after_round(dag_state.read().gc_round());
2078        // Need at least one subscriber to the block broadcast channel.
2079        let _block_receiver = signal_receivers.block_broadcast_receiver();
2080        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2081        let _core = Core::new(
2082            context.clone(),
2083            leader_schedule,
2084            transaction_consumer,
2085            transaction_certifier,
2086            block_manager,
2087            commit_observer,
2088            signals,
2089            key_pairs.remove(context.own_index.value()).1,
2090            dag_state.clone(),
2091            false,
2092            round_tracker,
2093        );
2094
2095        // Flush the DAG state to storage.
2096        dag_state.write().flush();
2097
2098        let last_commit = store
2099            .read_last_commit()
2100            .unwrap()
2101            .expect("last commit should be set");
2102
2103        assert_eq!(last_commit.index(), 5);
2104
2105        while let Some(result) = block_status_subscriptions.next().await {
2106            let status = result.unwrap();
2107
2108            match status {
2109                BlockStatus::Sequenced(block_ref) => {
2110                    assert!(block_ref.round == 1 || block_ref.round == 5);
2111                }
2112                BlockStatus::GarbageCollected(block_ref) => {
2113                    assert!(block_ref.round == 2 || block_ref.round == 3);
2114                }
2115            }
2116        }
2117    }
2118
2119    // Tests that the threshold clock advances when blocks get unsuspended due to GC'ed blocks and newly created blocks are always higher
2120    // than the last advanced gc round.
2121    #[tokio::test]
2122    async fn test_multiple_commits_advance_threshold_clock() {
2123        telemetry_subscribers::init_for_testing();
2124        let (mut context, mut key_pairs) = Context::new_for_test(4);
2125        const GC_DEPTH: u32 = 2;
2126
2127        context
2128            .protocol_config
2129            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2130
2131        let context = Arc::new(context);
2132
2133        let store = Arc::new(MemStore::new());
2134        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2135        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2136
2137        // On round 1 we do produce the block for authority D but we do not link it until round 6. This is making round 6 unable to get processed
2138        // until leader of round 3 is committed where round 1 gets garbage collected.
2139        // Then we add more rounds so we can trigger a commit for leader of round 9 which will move the gc round to 7.
2140        let dag_str = "DAG {
2141            Round 0 : { 4 },
2142            Round 1 : { * },
2143            Round 2 : { 
2144                B -> [-D1],
2145                C -> [-D1],
2146                D -> [-D1],
2147            },
2148            Round 3 : {
2149                B -> [*],
2150                C -> [*]
2151                D -> [*],
2152            },
2153            Round 4 : { 
2154                A -> [*],
2155                B -> [*],
2156                C -> [*]
2157                D -> [*],
2158            },
2159            Round 5 : { 
2160                A -> [*],
2161                B -> [*],
2162                C -> [*],
2163                D -> [*],
2164            },
2165            Round 6 : { 
2166                B -> [A5, B5, C5, D1],
2167                C -> [A5, B5, C5, D1],
2168                D -> [A5, B5, C5, D1],
2169            },
2170            Round 7 : { 
2171                B -> [*],
2172                C -> [*],
2173                D -> [*],
2174            },
2175            Round 8 : { 
2176                B -> [*],
2177                C -> [*],
2178                D -> [*],
2179            },
2180            Round 9 : { 
2181                B -> [*],
2182                C -> [*],
2183                D -> [*],
2184            },
2185            Round 10 : { 
2186                B -> [*],
2187                C -> [*],
2188                D -> [*],
2189            },
2190            Round 11 : { 
2191                B -> [*],
2192                C -> [*],
2193                D -> [*],
2194            },
2195        }";
2196
2197        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2198        dag_builder.print();
2199
2200        // create dag state after all blocks have been written to store
2201        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2202        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2203        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2204            context.clone(),
2205            dag_state.clone(),
2206        ));
2207        let (blocks_sender, _blocks_receiver) =
2208            monitored_mpsc::unbounded_channel("consensus_block_output");
2209        let transaction_certifier = TransactionCertifier::new(
2210            context.clone(),
2211            Arc::new(NoopBlockVerifier {}),
2212            dag_state.clone(),
2213            blocks_sender,
2214        );
2215
2216        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
2217        let commit_observer = CommitObserver::new(
2218            context.clone(),
2219            commit_consumer,
2220            dag_state.clone(),
2221            transaction_certifier.clone(),
2222            leader_schedule.clone(),
2223        )
2224        .await;
2225
2226        // Flush the DAG state to storage.
2227        dag_state.write().flush();
2228
2229        // Check no commits have been persisted to dag_state or store.
2230        let last_commit = store.read_last_commit().unwrap();
2231        assert!(last_commit.is_none());
2232        assert_eq!(dag_state.read().last_commit_index(), 0);
2233
2234        // Now spin up core
2235        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2236        let (blocks_sender, _blocks_receiver) =
2237            monitored_mpsc::unbounded_channel("consensus_block_output");
2238        let transaction_certifier = TransactionCertifier::new(
2239            context.clone(),
2240            Arc::new(NoopBlockVerifier {}),
2241            dag_state.clone(),
2242            blocks_sender,
2243        );
2244        // Need at least one subscriber to the block broadcast channel.
2245        let _block_receiver = signal_receivers.block_broadcast_receiver();
2246        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2247        let mut core = Core::new(
2248            context.clone(),
2249            leader_schedule,
2250            transaction_consumer,
2251            transaction_certifier.clone(),
2252            block_manager,
2253            commit_observer,
2254            signals,
2255            key_pairs.remove(context.own_index.value()).1,
2256            dag_state.clone(),
2257            true,
2258            round_tracker,
2259        );
2260        // We set the last known round to 4 so we avoid creating new blocks until then - otherwise it will crash as the already created DAG contains blocks for this
2261        // authority.
2262        core.set_last_known_proposed_round(4);
2263
2264        // We add all the blocks except D1. The only ones we can immediately accept are the ones up to round 5 as they don't have a dependency on D1. Rest of blocks do have causal dependency
2265        // to D1 so they can't be processed until the leader of round 3 can get committed and gc round moves to 1. That will make all the blocks that depend to D1 get accepted.
2266        // However, our threshold clock is now at round 6 as the last quorum that we managed to process was the round 5.
2267        // As commits happen blocks of later rounds get accepted and more leaders get committed. Eventually the leader of round 9 gets committed and gc is moved to 9 - 2 = 7.
2268        // If our node attempts to produce a block for the threshold clock 6, that will make the acceptance checks fail as now gc has moved far past this round.
2269        let mut all_blocks = dag_builder.blocks(1..=11);
2270        all_blocks.sort_by_key(|b| b.round());
2271        let voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)> =
2272            all_blocks.iter().map(|b| (b.clone(), vec![])).collect();
2273        transaction_certifier.add_voted_blocks(voted_blocks);
2274        let blocks: Vec<VerifiedBlock> = all_blocks
2275            .into_iter()
2276            .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
2277            .collect();
2278        core.add_blocks(blocks).expect("Should not fail");
2279
2280        assert_eq!(core.last_proposed_round(), 12);
2281    }
2282
2283    #[tokio::test]
2284    async fn test_core_set_min_propose_round() {
2285        telemetry_subscribers::init_for_testing();
2286        let (context, mut key_pairs) = Context::new_for_test(4);
2287        let context = Arc::new(context.with_parameters(Parameters {
2288            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2289            ..Default::default()
2290        }));
2291
2292        let store = Arc::new(MemStore::new());
2293        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2294
2295        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2296        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2297            context.clone(),
2298            dag_state.clone(),
2299        ));
2300
2301        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2302        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2303        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2304        let (blocks_sender, _blocks_receiver) =
2305            monitored_mpsc::unbounded_channel("consensus_block_output");
2306        let transaction_certifier = TransactionCertifier::new(
2307            context.clone(),
2308            Arc::new(NoopBlockVerifier {}),
2309            dag_state.clone(),
2310            blocks_sender,
2311        );
2312        // Need at least one subscriber to the block broadcast channel.
2313        let _block_receiver = signal_receivers.block_broadcast_receiver();
2314
2315        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
2316        let commit_observer = CommitObserver::new(
2317            context.clone(),
2318            commit_consumer,
2319            dag_state.clone(),
2320            transaction_certifier.clone(),
2321            leader_schedule.clone(),
2322        )
2323        .await;
2324
2325        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2326        let mut core = Core::new(
2327            context.clone(),
2328            leader_schedule,
2329            transaction_consumer,
2330            transaction_certifier.clone(),
2331            block_manager,
2332            commit_observer,
2333            signals,
2334            key_pairs.remove(context.own_index.value()).1,
2335            dag_state.clone(),
2336            true,
2337            round_tracker,
2338        );
2339
2340        // No new block should have been produced
2341        assert_eq!(
2342            core.last_proposed_round(),
2343            GENESIS_ROUND,
2344            "No block should have been created other than genesis"
2345        );
2346
2347        // Trying to explicitly propose a block will not produce anything
2348        assert!(core.try_propose(true).unwrap().is_none());
2349
2350        // Create blocks for the whole network - even "our" node in order to replicate an "amnesia" recovery.
2351        let mut builder = DagBuilder::new(context.clone());
2352        builder.layers(1..=10).build();
2353
2354        let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
2355
2356        // Process all the blocks
2357        transaction_certifier
2358            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2359        assert!(core.add_blocks(blocks).unwrap().is_empty());
2360
2361        core.round_tracker.write().update_from_probe(
2362            vec![
2363                vec![10, 10, 10, 10],
2364                vec![10, 10, 10, 10],
2365                vec![10, 10, 10, 10],
2366                vec![10, 10, 10, 10],
2367            ],
2368            vec![
2369                vec![10, 10, 10, 10],
2370                vec![10, 10, 10, 10],
2371                vec![10, 10, 10, 10],
2372                vec![10, 10, 10, 10],
2373            ],
2374        );
2375
2376        // Try to propose - no block should be produced.
2377        assert!(core.try_propose(true).unwrap().is_none());
2378
2379        // Now set the last known proposed round which is the highest round for which the network informed
2380        // us that we do have proposed a block about.
2381        core.set_last_known_proposed_round(10);
2382
2383        let block = core.try_propose(true).expect("No error").unwrap();
2384        assert_eq!(block.round(), 11);
2385        assert_eq!(block.ancestors().len(), 4);
2386
2387        let our_ancestor_included = block.ancestors()[0];
2388        assert_eq!(our_ancestor_included.author, context.own_index);
2389        assert_eq!(our_ancestor_included.round, 10);
2390    }
2391
2392    #[tokio::test(flavor = "current_thread", start_paused = true)]
2393    async fn test_core_try_new_block_leader_timeout() {
2394        telemetry_subscribers::init_for_testing();
2395
2396        // Since we run the test with started_paused = true, any time-dependent operations using Tokio's time
2397        // facilities, such as tokio::time::sleep or tokio::time::Instant, will not advance. So practically each
2398        // Core's clock will have initialised potentially with different values but it never advances.
2399        // To ensure that blocks won't get rejected by cores we'll need to manually wait for the time
2400        // diff before processing them. By calling the `tokio::time::sleep` we implicitly also advance the
2401        // tokio clock.
2402        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2403            // Simulate the time wait before processing a block to ensure that block.timestamp <= now
2404            let now = context.clock.timestamp_utc_ms();
2405            let max_timestamp = blocks
2406                .iter()
2407                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2408                .map(|block| block.timestamp_ms())
2409                .unwrap_or(0);
2410
2411            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2412            sleep(wait_time).await;
2413        }
2414
2415        let (context, _) = Context::new_for_test(4);
2416        // Create the cores for all authorities
2417        let mut all_cores = create_cores(context, vec![1, 1, 1, 1]).await;
2418
2419        // Create blocks for rounds 1..=3 from all Cores except last Core of authority 3, so we miss the block from it. As
2420        // it will be the leader of round 3 then no-one will be able to progress to round 4 unless we explicitly trigger
2421        // the block creation.
2422        // create the cores and their signals for all the authorities
2423        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2424
2425        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
2426        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2427        for round in 1..=3 {
2428            let mut this_round_blocks = Vec::new();
2429
2430            for core_fixture in cores.iter_mut() {
2431                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2432
2433                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2434
2435                // Only when round > 1 and using non-genesis parents.
2436                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2437                    assert_eq!(round - 1, r);
2438                    if core_fixture.core.last_proposed_round() == r {
2439                        // Force propose new block regardless of min round delay.
2440                        core_fixture
2441                            .core
2442                            .try_propose(true)
2443                            .unwrap()
2444                            .unwrap_or_else(|| {
2445                                panic!("Block should have been proposed for round {}", round)
2446                            });
2447                    }
2448                }
2449
2450                assert_eq!(core_fixture.core.last_proposed_round(), round);
2451
2452                this_round_blocks.push(core_fixture.core.last_proposed_block());
2453            }
2454
2455            last_round_blocks = this_round_blocks;
2456        }
2457
2458        // Try to create the blocks for round 4 by calling the try_propose() method. No block should be created as the
2459        // leader - authority 3 - hasn't proposed any block.
2460        for core_fixture in cores.iter_mut() {
2461            wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2462
2463            core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2464            assert!(core_fixture.core.try_propose(false).unwrap().is_none());
2465        }
2466
2467        // Now try to create the blocks for round 4 via the leader timeout method which should
2468        // ignore any leader checks or min round delay.
2469        for core_fixture in cores.iter_mut() {
2470            assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
2471            assert_eq!(core_fixture.core.last_proposed_round(), 4);
2472
2473            // Flush the DAG state to storage.
2474            core_fixture.dag_state.write().flush();
2475
2476            // Check commits have been persisted to store
2477            let last_commit = core_fixture
2478                .store
2479                .read_last_commit()
2480                .unwrap()
2481                .expect("last commit should be set");
2482            // There are 1 leader rounds with rounds completed up to and including
2483            // round 4
2484            assert_eq!(last_commit.index(), 1);
2485            let all_stored_commits = core_fixture
2486                .store
2487                .scan_commits((0..=CommitIndex::MAX).into())
2488                .unwrap();
2489            assert_eq!(all_stored_commits.len(), 1);
2490        }
2491    }
2492
2493    #[tokio::test(flavor = "current_thread", start_paused = true)]
2494    async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
2495        telemetry_subscribers::init_for_testing();
2496
2497        // Since we run the test with started_paused = true, any time-dependent operations using Tokio's time
2498        // facilities, such as tokio::time::sleep or tokio::time::Instant, will not advance. So practically each
2499        // Core's clock will have initialised potentially with different values but it never advances.
2500        // To ensure that blocks won't get rejected by cores we'll need to manually wait for the time
2501        // diff before processing them. By calling the `tokio::time::sleep` we implicitly also advance the
2502        // tokio clock.
2503        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2504            // Simulate the time wait before processing a block to ensure that block.timestamp <= now
2505            let now = context.clock.timestamp_utc_ms();
2506            let max_timestamp = blocks
2507                .iter()
2508                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2509                .map(|block| block.timestamp_ms())
2510                .unwrap_or(0);
2511
2512            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2513            sleep(wait_time).await;
2514        }
2515
2516        let (mut context, _) = Context::new_for_test(5);
2517        context
2518            .protocol_config
2519            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
2520
2521        // Create the cores for all authorities
2522        let mut all_cores = create_cores(context, vec![1, 1, 1, 1, 1]).await;
2523        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2524
2525        // Create blocks for rounds 1..=30 from all Cores except last Core of authority 4.
2526        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2527        for round in 1..=30 {
2528            let mut this_round_blocks = Vec::new();
2529
2530            for core_fixture in cores.iter_mut() {
2531                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2532
2533                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2534
2535                core_fixture.core.round_tracker.write().update_from_probe(
2536                    vec![
2537                        vec![round, round, round, round, 0],
2538                        vec![round, round, round, round, 0],
2539                        vec![round, round, round, round, 0],
2540                        vec![round, round, round, round, 0],
2541                        vec![0, 0, 0, 0, 0],
2542                    ],
2543                    vec![
2544                        vec![round, round, round, round, 0],
2545                        vec![round, round, round, round, 0],
2546                        vec![round, round, round, round, 0],
2547                        vec![round, round, round, round, 0],
2548                        vec![0, 0, 0, 0, 0],
2549                    ],
2550                );
2551
2552                // Only when round > 1 and using non-genesis parents.
2553                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2554                    assert_eq!(round - 1, r);
2555                    if core_fixture.core.last_proposed_round() == r {
2556                        // Force propose new block regardless of min round delay.
2557                        core_fixture
2558                            .core
2559                            .try_propose(true)
2560                            .unwrap()
2561                            .unwrap_or_else(|| {
2562                                panic!("Block should have been proposed for round {}", round)
2563                            });
2564                    }
2565                }
2566
2567                assert_eq!(core_fixture.core.last_proposed_round(), round);
2568
2569                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2570            }
2571
2572            last_round_blocks = this_round_blocks;
2573        }
2574
2575        // Now produce blocks for all Cores
2576        for round in 31..=40 {
2577            let mut this_round_blocks = Vec::new();
2578
2579            for core_fixture in all_cores.iter_mut() {
2580                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2581
2582                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2583
2584                // Don't update probed rounds for authority 3 so it will remain
2585                // excluded
2586                core_fixture.core.round_tracker.write().update_from_probe(
2587                    vec![
2588                        vec![round, round, round, round, 0],
2589                        vec![round, round, round, round, 0],
2590                        vec![round, round, round, round, 0],
2591                        vec![round, round, round, round, 0],
2592                        vec![0, 0, 0, 0, 0],
2593                    ],
2594                    vec![
2595                        vec![round, round, round, round, 0],
2596                        vec![round, round, round, round, 0],
2597                        vec![round, round, round, round, 0],
2598                        vec![round, round, round, round, 0],
2599                        vec![0, 0, 0, 0, 0],
2600                    ],
2601                );
2602
2603                // Only when round > 1 and using non-genesis parents.
2604                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2605                    assert_eq!(round - 1, r);
2606                    if core_fixture.core.last_proposed_round() == r {
2607                        // Force propose new block regardless of min round delay.
2608                        core_fixture
2609                            .core
2610                            .try_propose(true)
2611                            .unwrap()
2612                            .unwrap_or_else(|| {
2613                                panic!("Block should have been proposed for round {}", round)
2614                            });
2615                    }
2616                }
2617
2618                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2619
2620                for block in this_round_blocks.iter() {
2621                    if block.author() != AuthorityIndex::new_for_test(4) {
2622                        // Assert blocks created include only 4 ancestors per block as one
2623                        // should be excluded
2624                        assert_eq!(block.ancestors().len(), 4);
2625                    } else {
2626                        // Authority 3 is the low scoring authority so it will still include
2627                        // its own blocks.
2628                        assert_eq!(block.ancestors().len(), 5);
2629                    }
2630                }
2631            }
2632
2633            last_round_blocks = this_round_blocks;
2634        }
2635    }
2636
2637    #[tokio::test]
2638    async fn test_smart_ancestor_selection() {
2639        telemetry_subscribers::init_for_testing();
2640        let (mut context, mut key_pairs) = Context::new_for_test(7);
2641        context
2642            .protocol_config
2643            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
2644        let context = Arc::new(context.with_parameters(Parameters {
2645            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2646            ..Default::default()
2647        }));
2648
2649        let store = Arc::new(MemStore::new());
2650        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2651
2652        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2653        let leader_schedule = Arc::new(
2654            LeaderSchedule::from_store(context.clone(), dag_state.clone())
2655                .with_num_commits_per_schedule(10),
2656        );
2657
2658        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2659        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2660        let (blocks_sender, _blocks_receiver) =
2661            monitored_mpsc::unbounded_channel("consensus_block_output");
2662        let transaction_certifier = TransactionCertifier::new(
2663            context.clone(),
2664            Arc::new(NoopBlockVerifier {}),
2665            dag_state.clone(),
2666            blocks_sender,
2667        );
2668        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2669        // Need at least one subscriber to the block broadcast channel.
2670        let mut block_receiver = signal_receivers.block_broadcast_receiver();
2671
2672        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
2673        let commit_observer = CommitObserver::new(
2674            context.clone(),
2675            commit_consumer,
2676            dag_state.clone(),
2677            transaction_certifier.clone(),
2678            leader_schedule.clone(),
2679        )
2680        .await;
2681
2682        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2683        let mut core = Core::new(
2684            context.clone(),
2685            leader_schedule,
2686            transaction_consumer,
2687            transaction_certifier.clone(),
2688            block_manager,
2689            commit_observer,
2690            signals,
2691            key_pairs.remove(context.own_index.value()).1,
2692            dag_state.clone(),
2693            true,
2694            round_tracker.clone(),
2695        );
2696
2697        // No new block should have been produced
2698        assert_eq!(
2699            core.last_proposed_round(),
2700            GENESIS_ROUND,
2701            "No block should have been created other than genesis"
2702        );
2703
2704        // Trying to explicitly propose a block will not produce anything
2705        assert!(core.try_propose(true).unwrap().is_none());
2706
2707        // Create blocks for the whole network but not for authority 1
2708        let mut builder = DagBuilder::new(context.clone());
2709        builder
2710            .layers(1..=12)
2711            .authorities(vec![AuthorityIndex::new_for_test(1)])
2712            .skip_block()
2713            .build();
2714        let blocks = builder.blocks(1..=12);
2715        // Process all the blocks
2716        transaction_certifier
2717            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2718        assert!(core.add_blocks(blocks).unwrap().is_empty());
2719        core.set_last_known_proposed_round(12);
2720
2721        round_tracker.write().update_from_probe(
2722            vec![
2723                vec![12, 12, 12, 12, 12, 12, 12],
2724                vec![0, 0, 0, 0, 0, 0, 0],
2725                vec![12, 12, 12, 12, 12, 12, 12],
2726                vec![12, 12, 12, 12, 12, 12, 12],
2727                vec![12, 12, 12, 12, 12, 12, 12],
2728                vec![12, 12, 12, 12, 12, 12, 12],
2729                vec![12, 12, 12, 12, 12, 12, 12],
2730            ],
2731            vec![
2732                vec![12, 12, 12, 12, 12, 12, 12],
2733                vec![0, 0, 0, 0, 0, 0, 0],
2734                vec![12, 12, 12, 12, 12, 12, 12],
2735                vec![12, 12, 12, 12, 12, 12, 12],
2736                vec![12, 12, 12, 12, 12, 12, 12],
2737                vec![12, 12, 12, 12, 12, 12, 12],
2738                vec![12, 12, 12, 12, 12, 12, 12],
2739            ],
2740        );
2741
2742        let block = core.try_propose(true).expect("No error").unwrap();
2743        assert_eq!(block.round(), 13);
2744        assert_eq!(block.ancestors().len(), 7);
2745
2746        // Build blocks for rest of the network other than own index
2747        builder
2748            .layers(13..=14)
2749            .authorities(vec![AuthorityIndex::new_for_test(0)])
2750            .skip_block()
2751            .build();
2752        let blocks = builder.blocks(13..=14);
2753        transaction_certifier
2754            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2755        assert!(core.add_blocks(blocks).unwrap().is_empty());
2756
2757        // We now have triggered a leader schedule change so we should have
2758        // one EXCLUDE authority (1) when we go to select ancestors for the next proposal
2759        let block = core.try_propose(true).expect("No error").unwrap();
2760        assert_eq!(block.round(), 15);
2761        assert_eq!(block.ancestors().len(), 6);
2762
2763        // Build blocks for a quorum of the network including the EXCLUDE authority (1)
2764        // which will trigger smart select and we will not propose a block
2765        let round_14_ancestors = builder.last_ancestors.clone();
2766        builder
2767            .layer(15)
2768            .authorities(vec![
2769                AuthorityIndex::new_for_test(0),
2770                AuthorityIndex::new_for_test(5),
2771                AuthorityIndex::new_for_test(6),
2772            ])
2773            .skip_block()
2774            .build();
2775        let blocks = builder.blocks(15..=15);
2776        let authority_1_excluded_block_reference = blocks
2777            .iter()
2778            .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2779            .unwrap()
2780            .reference();
2781        // Wait for min round delay to allow blocks to be proposed.
2782        sleep(context.parameters.min_round_delay).await;
2783        // Smart select should be triggered and no block should be proposed.
2784        transaction_certifier
2785            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2786        assert!(core.add_blocks(blocks).unwrap().is_empty());
2787        assert_eq!(core.last_proposed_block().round(), 15);
2788
2789        builder
2790            .layer(15)
2791            .authorities(vec![
2792                AuthorityIndex::new_for_test(0),
2793                AuthorityIndex::new_for_test(1),
2794                AuthorityIndex::new_for_test(2),
2795                AuthorityIndex::new_for_test(3),
2796                AuthorityIndex::new_for_test(4),
2797            ])
2798            .skip_block()
2799            .override_last_ancestors(round_14_ancestors)
2800            .build();
2801        let blocks = builder.blocks(15..=15);
2802        let round_15_ancestors: Vec<BlockRef> = blocks
2803            .iter()
2804            .filter(|block| block.round() == 15)
2805            .map(|block| block.reference())
2806            .collect();
2807        let included_block_references = iter::once(&core.last_proposed_block())
2808            .chain(blocks.iter())
2809            .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2810            .map(|block| block.reference())
2811            .collect::<Vec<_>>();
2812
2813        // Have enough ancestor blocks to propose now.
2814        transaction_certifier
2815            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2816        assert!(core.add_blocks(blocks).unwrap().is_empty());
2817        assert_eq!(core.last_proposed_block().round(), 16);
2818
2819        // Check that a new block has been proposed & signaled.
2820        let extended_block = loop {
2821            let extended_block =
2822                tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2823                    .await
2824                    .unwrap()
2825                    .unwrap();
2826            if extended_block.block.round() == 16 {
2827                break extended_block;
2828            }
2829        };
2830        assert_eq!(extended_block.block.round(), 16);
2831        assert_eq!(extended_block.block.author(), core.context.own_index);
2832        assert_eq!(extended_block.block.ancestors().len(), 6);
2833        assert_eq!(extended_block.block.ancestors(), included_block_references);
2834        assert_eq!(extended_block.excluded_ancestors.len(), 1);
2835        assert_eq!(
2836            extended_block.excluded_ancestors[0],
2837            authority_1_excluded_block_reference
2838        );
2839
2840        // Build blocks for a quorum of the network including the EXCLUDE ancestor
2841        // which will trigger smart select and we will not propose a block.
2842        // This time we will force propose by hitting the leader timeout after which
2843        // should cause us to include this EXCLUDE ancestor.
2844        builder
2845            .layer(16)
2846            .authorities(vec![
2847                AuthorityIndex::new_for_test(0),
2848                AuthorityIndex::new_for_test(5),
2849                AuthorityIndex::new_for_test(6),
2850            ])
2851            .skip_block()
2852            .override_last_ancestors(round_15_ancestors)
2853            .build();
2854        let blocks = builder.blocks(16..=16);
2855        // Wait for leader timeout to force blocks to be proposed.
2856        sleep(context.parameters.min_round_delay).await;
2857        // Smart select should be triggered and no block should be proposed.
2858        transaction_certifier
2859            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2860        assert!(core.add_blocks(blocks).unwrap().is_empty());
2861        assert_eq!(core.last_proposed_block().round(), 16);
2862
2863        // Simulate a leader timeout and a force proposal where we will include
2864        // one EXCLUDE ancestor when we go to select ancestors for the next proposal
2865        let block = core.try_propose(true).expect("No error").unwrap();
2866        assert_eq!(block.round(), 17);
2867        assert_eq!(block.ancestors().len(), 5);
2868
2869        // Check that a new block has been proposed & signaled.
2870        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2871            .await
2872            .unwrap()
2873            .unwrap();
2874        assert_eq!(extended_block.block.round(), 17);
2875        assert_eq!(extended_block.block.author(), core.context.own_index);
2876        assert_eq!(extended_block.block.ancestors().len(), 5);
2877        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2878
2879        // Excluded authority is locked until round 20, simulate enough rounds to
2880        // unlock
2881        builder
2882            .layers(17..=22)
2883            .authorities(vec![AuthorityIndex::new_for_test(0)])
2884            .skip_block()
2885            .build();
2886        let blocks = builder.blocks(17..=22);
2887
2888        // Simulate updating received and accepted rounds from prober.
2889        // New quorum rounds for authority can then be computed which will unlock
2890        // the Excluded authority (1) and then we should be able to create a new
2891        // layer of blocks which will then all be included as ancestors for the
2892        // next proposal
2893        round_tracker.write().update_from_probe(
2894            vec![
2895                vec![22, 22, 22, 22, 22, 22, 22],
2896                vec![22, 22, 22, 22, 22, 22, 22],
2897                vec![22, 22, 22, 22, 22, 22, 22],
2898                vec![22, 22, 22, 22, 22, 22, 22],
2899                vec![22, 22, 22, 22, 22, 22, 22],
2900                vec![22, 22, 22, 22, 22, 22, 22],
2901                vec![22, 22, 22, 22, 22, 22, 22],
2902            ],
2903            vec![
2904                vec![22, 22, 22, 22, 22, 22, 22],
2905                vec![22, 22, 22, 22, 22, 22, 22],
2906                vec![22, 22, 22, 22, 22, 22, 22],
2907                vec![22, 22, 22, 22, 22, 22, 22],
2908                vec![22, 22, 22, 22, 22, 22, 22],
2909                vec![22, 22, 22, 22, 22, 22, 22],
2910                vec![22, 22, 22, 22, 22, 22, 22],
2911            ],
2912        );
2913
2914        let included_block_references = iter::once(&core.last_proposed_block())
2915            .chain(blocks.iter())
2916            .filter(|block| block.round() == 22 || block.author() == core.context.own_index)
2917            .map(|block| block.reference())
2918            .collect::<Vec<_>>();
2919
2920        // Have enough ancestor blocks to propose now.
2921        sleep(context.parameters.min_round_delay).await;
2922        transaction_certifier
2923            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2924        assert!(core.add_blocks(blocks).unwrap().is_empty());
2925        assert_eq!(core.last_proposed_block().round(), 23);
2926
2927        // Check that a new block has been proposed & signaled.
2928        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2929            .await
2930            .unwrap()
2931            .unwrap();
2932        assert_eq!(extended_block.block.round(), 23);
2933        assert_eq!(extended_block.block.author(), core.context.own_index);
2934        assert_eq!(extended_block.block.ancestors().len(), 7);
2935        assert_eq!(extended_block.block.ancestors(), included_block_references);
2936        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2937    }
2938
2939    #[tokio::test]
2940    async fn test_excluded_ancestor_limit() {
2941        telemetry_subscribers::init_for_testing();
2942        let (context, mut key_pairs) = Context::new_for_test(4);
2943        let context = Arc::new(context.with_parameters(Parameters {
2944            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2945            ..Default::default()
2946        }));
2947
2948        let store = Arc::new(MemStore::new());
2949        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2950
2951        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2952        let leader_schedule = Arc::new(
2953            LeaderSchedule::from_store(context.clone(), dag_state.clone())
2954                .with_num_commits_per_schedule(10),
2955        );
2956
2957        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2958        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2959        let (blocks_sender, _blocks_receiver) =
2960            monitored_mpsc::unbounded_channel("consensus_block_output");
2961        let transaction_certifier = TransactionCertifier::new(
2962            context.clone(),
2963            Arc::new(NoopBlockVerifier {}),
2964            dag_state.clone(),
2965            blocks_sender,
2966        );
2967        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2968        // Need at least one subscriber to the block broadcast channel.
2969        let mut block_receiver = signal_receivers.block_broadcast_receiver();
2970
2971        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
2972        let commit_observer = CommitObserver::new(
2973            context.clone(),
2974            commit_consumer,
2975            dag_state.clone(),
2976            transaction_certifier.clone(),
2977            leader_schedule.clone(),
2978        )
2979        .await;
2980
2981        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2982        let mut core = Core::new(
2983            context.clone(),
2984            leader_schedule,
2985            transaction_consumer,
2986            transaction_certifier.clone(),
2987            block_manager,
2988            commit_observer,
2989            signals,
2990            key_pairs.remove(context.own_index.value()).1,
2991            dag_state.clone(),
2992            true,
2993            round_tracker,
2994        );
2995
2996        // No new block should have been produced
2997        assert_eq!(
2998            core.last_proposed_round(),
2999            GENESIS_ROUND,
3000            "No block should have been created other than genesis"
3001        );
3002
3003        // Create blocks for the whole network
3004        let mut builder = DagBuilder::new(context.clone());
3005        builder.layers(1..=3).build();
3006
3007        // This will equivocate 9 blocks for authority 1 which will be excluded on
3008        // the proposal but because of the limits set will be dropped and not included
3009        // as part of the ExtendedBlock structure sent to the rest of the network
3010        builder
3011            .layer(4)
3012            .authorities(vec![AuthorityIndex::new_for_test(1)])
3013            .equivocate(9)
3014            .build();
3015        let blocks = builder.blocks(1..=4);
3016
3017        // Process all the blocks
3018        transaction_certifier
3019            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
3020        assert!(core.add_blocks(blocks).unwrap().is_empty());
3021        core.set_last_known_proposed_round(3);
3022
3023        let block = core.try_propose(true).expect("No error").unwrap();
3024        assert_eq!(block.round(), 5);
3025        assert_eq!(block.ancestors().len(), 4);
3026
3027        // Check that a new block has been proposed & signaled.
3028        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
3029            .await
3030            .unwrap()
3031            .unwrap();
3032        assert_eq!(extended_block.block.round(), 5);
3033        assert_eq!(extended_block.block.author(), core.context.own_index);
3034        assert_eq!(extended_block.block.ancestors().len(), 4);
3035        assert_eq!(extended_block.excluded_ancestors.len(), 8);
3036    }
3037
3038    #[tokio::test]
3039    async fn test_core_set_propagation_delay_per_authority() {
3040        // TODO: create helper to avoid the duplicated code here.
3041        telemetry_subscribers::init_for_testing();
3042        let (context, mut key_pairs) = Context::new_for_test(4);
3043        let context = Arc::new(context);
3044        let store = Arc::new(MemStore::new());
3045        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3046
3047        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3048        let leader_schedule = Arc::new(LeaderSchedule::from_store(
3049            context.clone(),
3050            dag_state.clone(),
3051        ));
3052
3053        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3054        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3055        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3056        let (blocks_sender, _blocks_receiver) =
3057            monitored_mpsc::unbounded_channel("consensus_block_output");
3058        let transaction_certifier = TransactionCertifier::new(
3059            context.clone(),
3060            Arc::new(NoopBlockVerifier {}),
3061            dag_state.clone(),
3062            blocks_sender,
3063        );
3064        // Need at least one subscriber to the block broadcast channel.
3065        let _block_receiver = signal_receivers.block_broadcast_receiver();
3066
3067        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
3068        let commit_observer = CommitObserver::new(
3069            context.clone(),
3070            commit_consumer,
3071            dag_state.clone(),
3072            transaction_certifier.clone(),
3073            leader_schedule.clone(),
3074        )
3075        .await;
3076
3077        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
3078        let mut core = Core::new(
3079            context.clone(),
3080            leader_schedule,
3081            transaction_consumer,
3082            transaction_certifier.clone(),
3083            block_manager,
3084            commit_observer,
3085            signals,
3086            key_pairs.remove(context.own_index.value()).1,
3087            dag_state.clone(),
3088            false,
3089            round_tracker.clone(),
3090        );
3091
3092        // Use a large propagation delay to disable proposing.
3093        // This is done by accepting an own block at round 1000 to dag state and
3094        // then simulating updating round tracker received rounds from probe where
3095        // low quorum round for own index should get calculated to round 0.
3096        let test_block = VerifiedBlock::new_for_test(TestBlock::new(1000, 0).build());
3097        transaction_certifier.add_voted_blocks(vec![(test_block.clone(), vec![])]);
3098        // Force accepting the block to dag state because its causal history is incomplete.
3099        dag_state.write().accept_block(test_block);
3100
3101        round_tracker.write().update_from_probe(
3102            vec![
3103                vec![0, 0, 0, 0],
3104                vec![0, 0, 0, 0],
3105                vec![0, 0, 0, 0],
3106                vec![0, 0, 0, 0],
3107            ],
3108            vec![
3109                vec![0, 0, 0, 0],
3110                vec![0, 0, 0, 0],
3111                vec![0, 0, 0, 0],
3112                vec![0, 0, 0, 0],
3113            ],
3114        );
3115
3116        // There is no proposal even with forced proposing.
3117        assert!(core.try_propose(true).unwrap().is_none());
3118
3119        // Let Core know there is no propagation delay.
3120        // This is done by simulating updating round tracker recieved rounds from probe
3121        // where low quorum round for own index should get calculated to round 1000.
3122        round_tracker.write().update_from_probe(
3123            vec![
3124                vec![1000, 1000, 1000, 1000],
3125                vec![1000, 1000, 1000, 1000],
3126                vec![1000, 1000, 1000, 1000],
3127                vec![1000, 1000, 1000, 1000],
3128            ],
3129            vec![
3130                vec![1000, 1000, 1000, 1000],
3131                vec![1000, 1000, 1000, 1000],
3132                vec![1000, 1000, 1000, 1000],
3133                vec![1000, 1000, 1000, 1000],
3134            ],
3135        );
3136
3137        // Also add the necessary blocks from round 1000 so core will propose for
3138        // round 1001
3139        for author in 1..4 {
3140            let block = VerifiedBlock::new_for_test(TestBlock::new(1000, author).build());
3141            transaction_certifier.add_voted_blocks(vec![(block.clone(), vec![])]);
3142            // Force accepting the block to dag state because its causal history is incomplete.
3143            dag_state.write().accept_block(block);
3144        }
3145
3146        // Proposing now would succeed.
3147        assert!(core.try_propose(true).unwrap().is_some());
3148    }
3149
3150    #[tokio::test(flavor = "current_thread", start_paused = true)]
3151    async fn test_leader_schedule_change() {
3152        telemetry_subscribers::init_for_testing();
3153        let default_params = Parameters::default();
3154
3155        let (context, _) = Context::new_for_test(4);
3156        // create the cores and their signals for all the authorities
3157        let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3158
3159        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
3160        let mut last_round_blocks = Vec::new();
3161        for round in 1..=30 {
3162            let mut this_round_blocks = Vec::new();
3163
3164            // Wait for min round delay to allow blocks to be proposed.
3165            sleep(default_params.min_round_delay).await;
3166
3167            for core_fixture in &mut cores {
3168                // add the blocks from last round
3169                // this will trigger a block creation for the round and a signal should be emitted
3170                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3171
3172                core_fixture.core.round_tracker.write().update_from_probe(
3173                    vec![
3174                        vec![round, round, round, round],
3175                        vec![round, round, round, round],
3176                        vec![round, round, round, round],
3177                        vec![round, round, round, round],
3178                    ],
3179                    vec![
3180                        vec![round, round, round, round],
3181                        vec![round, round, round, round],
3182                        vec![round, round, round, round],
3183                        vec![round, round, round, round],
3184                    ],
3185                );
3186
3187                // A "new round" signal should be received given that all the blocks of previous round have been processed
3188                let new_round = receive(
3189                    Duration::from_secs(1),
3190                    core_fixture.signal_receivers.new_round_receiver(),
3191                )
3192                .await;
3193                assert_eq!(new_round, round);
3194
3195                // Check that a new block has been proposed.
3196                let extended_block = tokio::time::timeout(
3197                    Duration::from_secs(1),
3198                    core_fixture.block_receiver.recv(),
3199                )
3200                .await
3201                .unwrap()
3202                .unwrap();
3203                assert_eq!(extended_block.block.round(), round);
3204                assert_eq!(
3205                    extended_block.block.author(),
3206                    core_fixture.core.context.own_index
3207                );
3208
3209                // append the new block to this round blocks
3210                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3211
3212                let block = core_fixture.core.last_proposed_block();
3213
3214                // ensure that produced block is referring to the blocks of last_round
3215                assert_eq!(
3216                    block.ancestors().len(),
3217                    core_fixture.core.context.committee.size()
3218                );
3219                for ancestor in block.ancestors() {
3220                    if block.round() > 1 {
3221                        // don't bother with round 1 block which just contains the genesis blocks.
3222                        assert!(
3223                            last_round_blocks
3224                                .iter()
3225                                .any(|block| block.reference() == *ancestor),
3226                            "Reference from previous round should be added"
3227                        );
3228                    }
3229                }
3230            }
3231
3232            last_round_blocks = this_round_blocks;
3233        }
3234
3235        for core_fixture in cores {
3236            // Flush the DAG state to storage.
3237            core_fixture.dag_state.write().flush();
3238
3239            // Check commits have been persisted to store
3240            let last_commit = core_fixture
3241                .store
3242                .read_last_commit()
3243                .unwrap()
3244                .expect("last commit should be set");
3245            // There are 28 leader rounds with rounds completed up to and including
3246            // round 29. Round 30 blocks will only include their own blocks, so the
3247            // 28th leader will not be committed.
3248            assert_eq!(last_commit.index(), 27);
3249            let all_stored_commits = core_fixture
3250                .store
3251                .scan_commits((0..=CommitIndex::MAX).into())
3252                .unwrap();
3253            assert_eq!(all_stored_commits.len(), 27);
3254            assert_eq!(
3255                core_fixture
3256                    .core
3257                    .leader_schedule
3258                    .leader_swap_table
3259                    .read()
3260                    .bad_nodes
3261                    .len(),
3262                1
3263            );
3264            assert_eq!(
3265                core_fixture
3266                    .core
3267                    .leader_schedule
3268                    .leader_swap_table
3269                    .read()
3270                    .good_nodes
3271                    .len(),
3272                1
3273            );
3274            let expected_reputation_scores =
3275                ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
3276            assert_eq!(
3277                core_fixture
3278                    .core
3279                    .leader_schedule
3280                    .leader_swap_table
3281                    .read()
3282                    .reputation_scores,
3283                expected_reputation_scores
3284            );
3285        }
3286    }
3287
3288    #[tokio::test]
3289    async fn test_filter_new_commits() {
3290        telemetry_subscribers::init_for_testing();
3291
3292        let (context, _key_pairs) = Context::new_for_test(4);
3293        let context = context.with_parameters(Parameters {
3294            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3295            ..Default::default()
3296        });
3297
3298        let authority_index = AuthorityIndex::new_for_test(0);
3299        let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
3300        let mut core = core.core;
3301
3302        // No new block should have been produced
3303        assert_eq!(
3304            core.last_proposed_round(),
3305            GENESIS_ROUND,
3306            "No block should have been created other than genesis"
3307        );
3308
3309        // create a DAG of 12 rounds
3310        let mut dag_builder = DagBuilder::new(core.context.clone());
3311        dag_builder.layers(1..=12).build();
3312
3313        // Store all blocks up to round 6 which should be enough to decide up to leader 4
3314        dag_builder.print();
3315        let blocks = dag_builder.blocks(1..=6);
3316
3317        for block in blocks {
3318            core.dag_state.write().accept_block(block);
3319        }
3320
3321        // Get all the committed sub dags up to round 10
3322        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3323
3324        // Now try to commit up to the latest leader (round = 4). Do not provide any certified commits.
3325        let committed_sub_dags = core.try_commit(vec![]).unwrap();
3326
3327        // We should have committed up to round 4
3328        assert_eq!(committed_sub_dags.len(), 4);
3329
3330        // Now validate the certified commits. We'll try 3 different scenarios:
3331        println!("Case 1. Provide certified commits that are all before the last committed round.");
3332
3333        // Highest certified commit should be for leader of round 4.
3334        let certified_commits = sub_dags_and_commits
3335            .iter()
3336            .take(4)
3337            .map(|(_, c)| c)
3338            .cloned()
3339            .collect::<Vec<_>>();
3340        assert!(
3341            certified_commits.last().unwrap().index()
3342                <= committed_sub_dags.last().unwrap().commit_ref.index,
3343            "Highest certified commit should older than the highest committed index."
3344        );
3345
3346        let certified_commits = core.filter_new_commits(certified_commits).unwrap();
3347
3348        // No commits should be processed
3349        assert!(certified_commits.is_empty());
3350
3351        println!("Case 2. Provide certified commits that are all after the last committed round.");
3352
3353        // Highest certified commit should be for leader of round 4.
3354        let certified_commits = sub_dags_and_commits
3355            .iter()
3356            .take(5)
3357            .map(|(_, c)| c.clone())
3358            .collect::<Vec<_>>();
3359
3360        let certified_commits = core.filter_new_commits(certified_commits.clone()).unwrap();
3361
3362        // The certified commit of index 5 should be processed.
3363        assert_eq!(certified_commits.len(), 1);
3364        assert_eq!(certified_commits.first().unwrap().reference().index, 5);
3365
3366        println!(
3367            "Case 3. Provide certified commits where the first certified commit index is not the last_commited_index + 1."
3368        );
3369
3370        // Highest certified commit should be for leader of round 4.
3371        let certified_commits = sub_dags_and_commits
3372            .iter()
3373            .skip(5)
3374            .take(1)
3375            .map(|(_, c)| c.clone())
3376            .collect::<Vec<_>>();
3377
3378        let err = core
3379            .filter_new_commits(certified_commits.clone())
3380            .unwrap_err();
3381        match err {
3382            ConsensusError::UnexpectedCertifiedCommitIndex {
3383                expected_commit_index: 5,
3384                commit_index: 6,
3385            } => (),
3386            _ => panic!("Unexpected error: {:?}", err),
3387        }
3388    }
3389
3390    #[tokio::test]
3391    async fn test_add_certified_commits() {
3392        telemetry_subscribers::init_for_testing();
3393
3394        let (context, _key_pairs) = Context::new_for_test(4);
3395        let context = context.with_parameters(Parameters {
3396            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3397            ..Default::default()
3398        });
3399
3400        let authority_index = AuthorityIndex::new_for_test(0);
3401        let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
3402        let store = core.store.clone();
3403        let mut core = core.core;
3404
3405        // No new block should have been produced
3406        assert_eq!(
3407            core.last_proposed_round(),
3408            GENESIS_ROUND,
3409            "No block should have been created other than genesis"
3410        );
3411
3412        // create a DAG of 12 rounds
3413        let mut dag_builder = DagBuilder::new(core.context.clone());
3414        dag_builder.layers(1..=12).build();
3415
3416        // Store all blocks up to round 6 which should be enough to decide up to leader 4
3417        dag_builder.print();
3418        let blocks = dag_builder.blocks(1..=6);
3419
3420        for block in blocks {
3421            core.dag_state.write().accept_block(block);
3422        }
3423
3424        // Get all the committed sub dags up to round 10
3425        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3426
3427        // Now try to commit up to the latest leader (round = 4). Do not provide any certified commits.
3428        let committed_sub_dags = core.try_commit(vec![]).unwrap();
3429
3430        // We should have committed up to round 4
3431        assert_eq!(committed_sub_dags.len(), 4);
3432
3433        // Flush the DAG state to storage.
3434        core.dag_state.write().flush();
3435
3436        println!("Case 1. Provide no certified commits. No commit should happen.");
3437
3438        let last_commit = store
3439            .read_last_commit()
3440            .unwrap()
3441            .expect("Last commit should be set");
3442        assert_eq!(last_commit.reference().index, 4);
3443
3444        println!(
3445            "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
3446        );
3447
3448        // The commits of leader rounds 5-8 should be committed via the certified commits.
3449        let certified_commits = sub_dags_and_commits
3450            .iter()
3451            .skip(3)
3452            .take(5)
3453            .map(|(_, c)| c.clone())
3454            .collect::<Vec<_>>();
3455
3456        // Now only add the blocks of rounds 8..=12. The blocks up to round 7 should be accepted via the certified commits processing.
3457        let blocks = dag_builder.blocks(8..=12);
3458        for block in blocks {
3459            core.dag_state.write().accept_block(block);
3460        }
3461
3462        // The corresponding blocks of the certified commits should be accepted and stored before linearizing and committing the DAG.
3463        core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
3464            .expect("Should not fail");
3465
3466        // Flush the DAG state to storage.
3467        core.dag_state.write().flush();
3468
3469        let commits = store.scan_commits((6..=10).into()).unwrap();
3470
3471        // We expect all the sub dags up to leader round 10 to be committed.
3472        assert_eq!(commits.len(), 5);
3473
3474        for i in 6..=10 {
3475            let commit = &commits[i - 6];
3476            assert_eq!(commit.reference().index, i as u32);
3477        }
3478    }
3479
3480    #[tokio::test]
3481    async fn try_commit_with_certified_commits_gced_blocks() {
3482        const GC_DEPTH: u32 = 3;
3483        telemetry_subscribers::init_for_testing();
3484
3485        let (mut context, mut key_pairs) = Context::new_for_test(5);
3486        context
3487            .protocol_config
3488            .set_consensus_gc_depth_for_testing(GC_DEPTH);
3489        let context = Arc::new(context.with_parameters(Parameters {
3490            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3491            ..Default::default()
3492        }));
3493
3494        let store = Arc::new(MemStore::new());
3495        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3496
3497        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3498        let leader_schedule = Arc::new(
3499            LeaderSchedule::from_store(context.clone(), dag_state.clone())
3500                .with_num_commits_per_schedule(10),
3501        );
3502
3503        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3504        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3505        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3506        let (blocks_sender, _blocks_receiver) =
3507            monitored_mpsc::unbounded_channel("consensus_block_output");
3508        let transaction_certifier = TransactionCertifier::new(
3509            context.clone(),
3510            Arc::new(NoopBlockVerifier {}),
3511            dag_state.clone(),
3512            blocks_sender,
3513        );
3514        // Need at least one subscriber to the block broadcast channel.
3515        let _block_receiver = signal_receivers.block_broadcast_receiver();
3516
3517        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
3518        let commit_observer = CommitObserver::new(
3519            context.clone(),
3520            commit_consumer,
3521            dag_state.clone(),
3522            transaction_certifier.clone(),
3523            leader_schedule.clone(),
3524        )
3525        .await;
3526
3527        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
3528        let mut core = Core::new(
3529            context.clone(),
3530            leader_schedule,
3531            transaction_consumer,
3532            transaction_certifier.clone(),
3533            block_manager,
3534            commit_observer,
3535            signals,
3536            key_pairs.remove(context.own_index.value()).1,
3537            dag_state.clone(),
3538            true,
3539            round_tracker,
3540        );
3541
3542        // No new block should have been produced
3543        assert_eq!(
3544            core.last_proposed_round(),
3545            GENESIS_ROUND,
3546            "No block should have been created other than genesis"
3547        );
3548
3549        let dag_str = "DAG {
3550            Round 0 : { 5 },
3551            Round 1 : { * },
3552            Round 2 : { 
3553                A -> [-E1],
3554                B -> [-E1],
3555                C -> [-E1],
3556                D -> [-E1],
3557            },
3558            Round 3 : {
3559                A -> [*],
3560                B -> [*],
3561                C -> [*],
3562                D -> [*],
3563            },
3564            Round 4 : { 
3565                A -> [*],
3566                B -> [*],
3567                C -> [*],
3568                D -> [*],
3569            },
3570            Round 5 : { 
3571                A -> [*],
3572                B -> [*],
3573                C -> [*],
3574                D -> [*],
3575                E -> [A4, B4, C4, D4, E1]
3576            },
3577            Round 6 : { * },
3578            Round 7 : { * },
3579        }";
3580
3581        let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
3582        dag_builder.print();
3583
3584        // Now get all the committed sub dags from the DagBuilder
3585        let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
3586            .get_sub_dag_and_certified_commits(1..=5)
3587            .into_iter()
3588            .unzip();
3589
3590        // Now try to commit up to the latest leader (round = 5) with the provided certified commits. Not that we have not accepted any
3591        // blocks. That should happen during the commit process.
3592        let committed_sub_dags = core.try_commit(certified_commits).unwrap();
3593
3594        // We should have committed up to round 4
3595        assert_eq!(committed_sub_dags.len(), 4);
3596        for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
3597            assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
3598
3599            // ensure that block from E1 node has not been committed
3600            for block in committed_sub_dag.blocks.iter() {
3601                if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
3602                    panic!("Did not expect to commit block E1");
3603                }
3604            }
3605        }
3606    }
3607
3608    #[tokio::test(flavor = "current_thread", start_paused = true)]
3609    async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
3610        parameterized_test_commit_on_leader_schedule_change_boundary(Some(1)).await;
3611    }
3612
3613    #[tokio::test(flavor = "current_thread", start_paused = true)]
3614    async fn test_commit_on_leader_schedule_change_boundary_with_multileader() {
3615        parameterized_test_commit_on_leader_schedule_change_boundary(None).await;
3616    }
3617
3618    async fn parameterized_test_commit_on_leader_schedule_change_boundary(
3619        num_leaders_per_round: Option<usize>,
3620    ) {
3621        telemetry_subscribers::init_for_testing();
3622        let default_params = Parameters::default();
3623
3624        let (mut context, _) = Context::new_for_test(6);
3625        context
3626            .protocol_config
3627            .set_mysticeti_num_leaders_per_round_for_testing(num_leaders_per_round);
3628        // create the cores and their signals for all the authorities
3629        let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]).await;
3630
3631        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
3632        let mut last_round_blocks: Vec<VerifiedBlock> = Vec::new();
3633        for round in 1..=33 {
3634            let mut this_round_blocks = Vec::new();
3635
3636            // Wait for min round delay to allow blocks to be proposed.
3637            sleep(default_params.min_round_delay).await;
3638
3639            for core_fixture in &mut cores {
3640                // add the blocks from last round
3641                // this will trigger a block creation for the round and a signal should be emitted
3642                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3643
3644                core_fixture.core.round_tracker.write().update_from_probe(
3645                    vec![
3646                        vec![round, round, round, round, round, round],
3647                        vec![round, round, round, round, round, round],
3648                        vec![round, round, round, round, round, round],
3649                        vec![round, round, round, round, round, round],
3650                        vec![round, round, round, round, round, round],
3651                        vec![round, round, round, round, round, round],
3652                    ],
3653                    vec![
3654                        vec![round, round, round, round, round, round],
3655                        vec![round, round, round, round, round, round],
3656                        vec![round, round, round, round, round, round],
3657                        vec![round, round, round, round, round, round],
3658                        vec![round, round, round, round, round, round],
3659                        vec![round, round, round, round, round, round],
3660                    ],
3661                );
3662
3663                // A "new round" signal should be received given that all the blocks of previous round have been processed
3664                let new_round = receive(
3665                    Duration::from_secs(1),
3666                    core_fixture.signal_receivers.new_round_receiver(),
3667                )
3668                .await;
3669                assert_eq!(new_round, round);
3670
3671                // Check that a new block has been proposed.
3672                let extended_block = tokio::time::timeout(
3673                    Duration::from_secs(1),
3674                    core_fixture.block_receiver.recv(),
3675                )
3676                .await
3677                .unwrap()
3678                .unwrap();
3679                assert_eq!(extended_block.block.round(), round);
3680                assert_eq!(
3681                    extended_block.block.author(),
3682                    core_fixture.core.context.own_index
3683                );
3684
3685                // append the new block to this round blocks
3686                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3687
3688                let block = core_fixture.core.last_proposed_block();
3689
3690                // ensure that produced block is referring to the blocks of last_round
3691                assert_eq!(
3692                    block.ancestors().len(),
3693                    core_fixture.core.context.committee.size()
3694                );
3695                for ancestor in block.ancestors() {
3696                    if block.round() > 1 {
3697                        // don't bother with round 1 block which just contains the genesis blocks.
3698                        assert!(
3699                            last_round_blocks
3700                                .iter()
3701                                .any(|block| block.reference() == *ancestor),
3702                            "Reference from previous round should be added"
3703                        );
3704                    }
3705                }
3706            }
3707
3708            last_round_blocks = this_round_blocks;
3709        }
3710
3711        for core_fixture in cores {
3712            // There are 31 leader rounds with rounds completed up to and including
3713            // round 33. Round 33 blocks will only include their own blocks, so there
3714            // should only be 30 commits.
3715            // However on a leader schedule change boundary its is possible for a
3716            // new leader to get selected for the same round if the leader elected
3717            // gets swapped allowing for multiple leaders to be committed at a round.
3718            // Meaning with multi leader per round explicitly set to 1 we will have 30,
3719            // otherwise 31.
3720            // NOTE: We used 31 leader rounds to specifically trigger the scenario
3721            // where the leader schedule boundary occurred AND we had a swap to a new
3722            // leader for the same round
3723            let expected_commit_count = match num_leaders_per_round {
3724                Some(1) => 30,
3725                _ => 31,
3726            };
3727
3728            // Flush the DAG state to storage.
3729            core_fixture.dag_state.write().flush();
3730
3731            // Check commits have been persisted to store
3732            let last_commit = core_fixture
3733                .store
3734                .read_last_commit()
3735                .unwrap()
3736                .expect("last commit should be set");
3737            assert_eq!(last_commit.index(), expected_commit_count);
3738            let all_stored_commits = core_fixture
3739                .store
3740                .scan_commits((0..=CommitIndex::MAX).into())
3741                .unwrap();
3742            assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3743            assert_eq!(
3744                core_fixture
3745                    .core
3746                    .leader_schedule
3747                    .leader_swap_table
3748                    .read()
3749                    .bad_nodes
3750                    .len(),
3751                1
3752            );
3753            assert_eq!(
3754                core_fixture
3755                    .core
3756                    .leader_schedule
3757                    .leader_swap_table
3758                    .read()
3759                    .good_nodes
3760                    .len(),
3761                1
3762            );
3763            let expected_reputation_scores =
3764                ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3765            assert_eq!(
3766                core_fixture
3767                    .core
3768                    .leader_schedule
3769                    .leader_swap_table
3770                    .read()
3771                    .reputation_scores,
3772                expected_reputation_scores
3773            );
3774        }
3775    }
3776
3777    #[tokio::test]
3778    async fn test_core_signals() {
3779        telemetry_subscribers::init_for_testing();
3780        let default_params = Parameters::default();
3781
3782        let (context, _) = Context::new_for_test(4);
3783        // create the cores and their signals for all the authorities
3784        let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3785
3786        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
3787        let mut last_round_blocks = Vec::new();
3788        for round in 1..=10 {
3789            let mut this_round_blocks = Vec::new();
3790
3791            // Wait for min round delay to allow blocks to be proposed.
3792            sleep(default_params.min_round_delay).await;
3793
3794            for core_fixture in &mut cores {
3795                // add the blocks from last round
3796                // this will trigger a block creation for the round and a signal should be emitted
3797                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3798
3799                core_fixture.core.round_tracker.write().update_from_probe(
3800                    vec![
3801                        vec![round, round, round, round],
3802                        vec![round, round, round, round],
3803                        vec![round, round, round, round],
3804                        vec![round, round, round, round],
3805                    ],
3806                    vec![
3807                        vec![round, round, round, round],
3808                        vec![round, round, round, round],
3809                        vec![round, round, round, round],
3810                        vec![round, round, round, round],
3811                    ],
3812                );
3813
3814                // A "new round" signal should be received given that all the blocks of previous round have been processed
3815                let new_round = receive(
3816                    Duration::from_secs(1),
3817                    core_fixture.signal_receivers.new_round_receiver(),
3818                )
3819                .await;
3820                assert_eq!(new_round, round);
3821
3822                // Check that a new block has been proposed.
3823                let extended_block = tokio::time::timeout(
3824                    Duration::from_secs(1),
3825                    core_fixture.block_receiver.recv(),
3826                )
3827                .await
3828                .unwrap()
3829                .unwrap();
3830                assert_eq!(extended_block.block.round(), round);
3831                assert_eq!(
3832                    extended_block.block.author(),
3833                    core_fixture.core.context.own_index
3834                );
3835
3836                // append the new block to this round blocks
3837                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3838
3839                let block = core_fixture.core.last_proposed_block();
3840
3841                // ensure that produced block is referring to the blocks of last_round
3842                assert_eq!(
3843                    block.ancestors().len(),
3844                    core_fixture.core.context.committee.size()
3845                );
3846                for ancestor in block.ancestors() {
3847                    if block.round() > 1 {
3848                        // don't bother with round 1 block which just contains the genesis blocks.
3849                        assert!(
3850                            last_round_blocks
3851                                .iter()
3852                                .any(|block| block.reference() == *ancestor),
3853                            "Reference from previous round should be added"
3854                        );
3855                    }
3856                }
3857            }
3858
3859            last_round_blocks = this_round_blocks;
3860        }
3861
3862        for core_fixture in cores {
3863            // Flush the DAG state to storage.
3864            core_fixture.dag_state.write().flush();
3865            // Check commits have been persisted to store
3866            let last_commit = core_fixture
3867                .store
3868                .read_last_commit()
3869                .unwrap()
3870                .expect("last commit should be set");
3871            // There are 8 leader rounds with rounds completed up to and including
3872            // round 9. Round 10 blocks will only include their own blocks, so the
3873            // 8th leader will not be committed.
3874            assert_eq!(last_commit.index(), 7);
3875            let all_stored_commits = core_fixture
3876                .store
3877                .scan_commits((0..=CommitIndex::MAX).into())
3878                .unwrap();
3879            assert_eq!(all_stored_commits.len(), 7);
3880        }
3881    }
3882
3883    #[tokio::test]
3884    async fn test_core_compress_proposal_references() {
3885        telemetry_subscribers::init_for_testing();
3886        let default_params = Parameters::default();
3887
3888        let (context, _) = Context::new_for_test(4);
3889        // create the cores and their signals for all the authorities
3890        let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3891
3892        let mut last_round_blocks = Vec::new();
3893        let mut all_blocks = Vec::new();
3894
3895        let excluded_authority = AuthorityIndex::new_for_test(3);
3896
3897        for round in 1..=10 {
3898            let mut this_round_blocks = Vec::new();
3899
3900            for core_fixture in &mut cores {
3901                // do not produce any block for authority 3
3902                if core_fixture.core.context.own_index == excluded_authority {
3903                    continue;
3904                }
3905
3906                // try to propose to ensure that we are covering the case where we miss the leader authority 3
3907                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3908                core_fixture.core.round_tracker.write().update_from_probe(
3909                    vec![
3910                        vec![round, round, round, round],
3911                        vec![round, round, round, round],
3912                        vec![round, round, round, round],
3913                        vec![round, round, round, round],
3914                    ],
3915                    vec![
3916                        vec![round, round, round, round],
3917                        vec![round, round, round, round],
3918                        vec![round, round, round, round],
3919                        vec![round, round, round, round],
3920                    ],
3921                );
3922                core_fixture.core.new_block(round, true).unwrap();
3923
3924                let block = core_fixture.core.last_proposed_block();
3925                assert_eq!(block.round(), round);
3926
3927                // append the new block to this round blocks
3928                this_round_blocks.push(block.clone());
3929            }
3930
3931            last_round_blocks = this_round_blocks.clone();
3932            all_blocks.extend(this_round_blocks);
3933        }
3934
3935        // Now send all the produced blocks to core of authority 3. It should produce a new block. If no compression would
3936        // be applied the we should expect all the previous blocks to be referenced from round 0..=10. However, since compression
3937        // is applied only the last round's (10) blocks should be referenced + the authority's block of round 0.
3938        let core_fixture = &mut cores[excluded_authority];
3939        // Wait for min round delay to allow blocks to be proposed.
3940        sleep(default_params.min_round_delay).await;
3941        // add blocks to trigger proposal.
3942        core_fixture.add_blocks(all_blocks).unwrap();
3943
3944        // Assert that a block has been created for round 11 and it references to blocks of round 10 for the other peers, and
3945        // to round 1 for its own block (created after recovery).
3946        let block = core_fixture.core.last_proposed_block();
3947        assert_eq!(block.round(), 11);
3948        assert_eq!(block.ancestors().len(), 4);
3949        for block_ref in block.ancestors() {
3950            if block_ref.author == excluded_authority {
3951                assert_eq!(block_ref.round, 1);
3952            } else {
3953                assert_eq!(block_ref.round, 10);
3954            }
3955        }
3956
3957        // Flush the DAG state to storage.
3958        core_fixture.dag_state.write().flush();
3959
3960        // Check commits have been persisted to store
3961        let last_commit = core_fixture
3962            .store
3963            .read_last_commit()
3964            .unwrap()
3965            .expect("last commit should be set");
3966        // There are 8 leader rounds with rounds completed up to and including
3967        // round 10. However because there were no blocks produced for authority 3
3968        // 2 leader rounds will be skipped.
3969        assert_eq!(last_commit.index(), 6);
3970        let all_stored_commits = core_fixture
3971            .store
3972            .scan_commits((0..=CommitIndex::MAX).into())
3973            .unwrap();
3974        assert_eq!(all_stored_commits.len(), 6);
3975    }
3976
3977    #[tokio::test]
3978    async fn try_select_certified_leaders() {
3979        // GIVEN
3980        telemetry_subscribers::init_for_testing();
3981
3982        let (context, _) = Context::new_for_test(4);
3983
3984        let authority_index = AuthorityIndex::new_for_test(0);
3985        let core =
3986            CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true).await;
3987        let mut core = core.core;
3988
3989        let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
3990        dag_builder.layers(1..=12).build();
3991
3992        let limit = 2;
3993
3994        let blocks = dag_builder.blocks(1..=12);
3995
3996        for block in blocks {
3997            core.dag_state.write().accept_block(block);
3998        }
3999
4000        // WHEN
4001        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
4002        let mut certified_commits = sub_dags_and_commits
4003            .into_iter()
4004            .map(|(_, commit)| commit)
4005            .collect::<Vec<_>>();
4006
4007        let leaders = core.try_select_certified_leaders(&mut certified_commits, limit);
4008
4009        // THEN
4010        assert_eq!(leaders.len(), 2);
4011        assert_eq!(certified_commits.len(), 2);
4012    }
4013
4014    pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
4015        tokio::time::timeout(timeout, receiver.changed())
4016            .await
4017            .expect("Timeout while waiting to read from receiver")
4018            .expect("Signal receive channel shouldn't be closed");
4019        *receiver.borrow_and_update()
4020    }
4021}