consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::Reverse,
6    collections::{BTreeMap, BTreeSet, BinaryHeap},
7    pin::Pin,
8    sync::Arc,
9    time::Duration,
10};
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockRef, Round};
16use futures::{Stream, StreamExt, ready, stream, task};
17use mysten_metrics::spawn_monitored_task;
18use parking_lot::RwLock;
19use rand::seq::SliceRandom as _;
20use sui_macros::fail_point_async;
21use tap::TapFallible;
22use tokio::sync::broadcast;
23use tokio_util::sync::ReusableBoxFuture;
24use tracing::{debug, info, warn};
25
26use crate::{
27    CommitIndex,
28    block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
29    block_verifier::BlockVerifier,
30    commit::{CommitAPI as _, CommitRange, TrustedCommit},
31    commit_vote_monitor::CommitVoteMonitor,
32    context::Context,
33    core_thread::CoreThreadDispatcher,
34    dag_state::DagState,
35    error::{ConsensusError, ConsensusResult},
36    network::{
37        BlockStream, ExtendedSerializedBlock, NodeId, ObserverBlockStream, ObserverNetworkService,
38        PeerId, ValidatorNetworkService,
39    },
40    round_tracker::RoundTracker,
41    stake_aggregator::{QuorumThreshold, StakeAggregator},
42    storage::Store,
43    synchronizer::SynchronizerHandle,
44    transaction_certifier::TransactionCertifier,
45};
46
47pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
48
49/// Authority's network service implementation, agnostic to the actual networking stack used.
50pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
51    context: Arc<Context>,
52    commit_vote_monitor: Arc<CommitVoteMonitor>,
53    block_verifier: Arc<dyn BlockVerifier>,
54    synchronizer: Arc<SynchronizerHandle>,
55    core_dispatcher: Arc<C>,
56    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
57    subscription_counter: Arc<SubscriptionCounter>,
58    transaction_certifier: TransactionCertifier,
59    dag_state: Arc<RwLock<DagState>>,
60    store: Arc<dyn Store>,
61    round_tracker: Arc<RwLock<RoundTracker>>,
62}
63
64impl<C: CoreThreadDispatcher> AuthorityService<C> {
65    pub(crate) fn new(
66        context: Arc<Context>,
67        block_verifier: Arc<dyn BlockVerifier>,
68        commit_vote_monitor: Arc<CommitVoteMonitor>,
69        round_tracker: Arc<RwLock<RoundTracker>>,
70        synchronizer: Arc<SynchronizerHandle>,
71        core_dispatcher: Arc<C>,
72        rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
73        transaction_certifier: TransactionCertifier,
74        dag_state: Arc<RwLock<DagState>>,
75        store: Arc<dyn Store>,
76    ) -> Self {
77        let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
78        Self {
79            context,
80            block_verifier,
81            commit_vote_monitor,
82            synchronizer,
83            core_dispatcher,
84            rx_block_broadcast,
85            subscription_counter,
86            transaction_certifier,
87            dag_state,
88            store,
89            round_tracker,
90        }
91    }
92
93    // Parses and validates serialized excluded ancestors.
94    fn parse_excluded_ancestors(
95        &self,
96        peer: AuthorityIndex,
97        block: &VerifiedBlock,
98        mut excluded_ancestors: Vec<Vec<u8>>,
99    ) -> ConsensusResult<Vec<BlockRef>> {
100        let peer_hostname = &self.context.committee.authority(peer).hostname;
101
102        let excluded_ancestors_limit = self.context.committee.size() * 2;
103        if excluded_ancestors.len() > excluded_ancestors_limit {
104            debug!(
105                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
106                excluded_ancestors.len() - excluded_ancestors_limit,
107                peer,
108                peer_hostname,
109            );
110            excluded_ancestors.truncate(excluded_ancestors_limit);
111        }
112
113        let excluded_ancestors = excluded_ancestors
114            .into_iter()
115            .map(|serialized| {
116                let block_ref: BlockRef =
117                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
118                if !self.context.committee.is_valid_index(block_ref.author) {
119                    return Err(ConsensusError::InvalidAuthorityIndex {
120                        index: block_ref.author,
121                        max: self.context.committee.size(),
122                    });
123                }
124                if block_ref.round >= block.round() {
125                    return Err(ConsensusError::InvalidAncestorRound {
126                        ancestor: block_ref.round,
127                        block: block.round(),
128                    });
129                }
130                Ok(block_ref)
131            })
132            .collect::<ConsensusResult<Vec<BlockRef>>>()?;
133
134        for excluded_ancestor in &excluded_ancestors {
135            let excluded_ancestor_hostname = &self
136                .context
137                .committee
138                .authority(excluded_ancestor.author)
139                .hostname;
140            self.context
141                .metrics
142                .node_metrics
143                .network_excluded_ancestors_count_by_authority
144                .with_label_values(&[excluded_ancestor_hostname])
145                .inc();
146        }
147        self.context
148            .metrics
149            .node_metrics
150            .network_received_excluded_ancestors_from_authority
151            .with_label_values(&[peer_hostname])
152            .inc_by(excluded_ancestors.len() as u64);
153
154        Ok(excluded_ancestors)
155    }
156}
157
158#[async_trait]
159impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
160    async fn handle_send_block(
161        &self,
162        peer: AuthorityIndex,
163        serialized_block: ExtendedSerializedBlock,
164    ) -> ConsensusResult<()> {
165        fail_point_async!("consensus-rpc-response");
166
167        let peer_hostname = &self.context.committee.authority(peer).hostname;
168
169        // TODO: dedup block verifications, here and with fetched blocks.
170        let signed_block: SignedBlock =
171            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
172
173        // Reject blocks not produced by the peer.
174        if peer != signed_block.author() {
175            self.context
176                .metrics
177                .node_metrics
178                .invalid_blocks
179                .with_label_values(&[
180                    peer_hostname.as_str(),
181                    "handle_send_block",
182                    "UnexpectedAuthority",
183                ])
184                .inc();
185            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
186            info!("Block with wrong authority from {}: {}", peer, e);
187            return Err(e);
188        }
189
190        // Reject blocks failing parsing and validations.
191        let (verified_block, reject_txn_votes) = self
192            .block_verifier
193            .verify_and_vote(signed_block, serialized_block.block)
194            .tap_err(|e| {
195                self.context
196                    .metrics
197                    .node_metrics
198                    .invalid_blocks
199                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
200                    .inc();
201                info!("Invalid block from {}: {}", peer, e);
202            })?;
203        let excluded_ancestors = self
204            .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
205            .tap_err(|e| {
206                debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
207                self.context
208                    .metrics
209                    .node_metrics
210                    .invalid_blocks
211                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
212                    .inc();
213            })?;
214
215        let block_ref = verified_block.reference();
216        debug!("Received block {} via send block.", block_ref);
217
218        self.context
219            .metrics
220            .node_metrics
221            .verified_blocks
222            .with_label_values(&[peer_hostname])
223            .inc();
224
225        let now = self.context.clock.timestamp_utc_ms();
226        let forward_time_drift =
227            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
228
229        self.context
230            .metrics
231            .node_metrics
232            .block_timestamp_drift_ms
233            .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
234            .inc_by(forward_time_drift.as_millis() as u64);
235
236        // Observe the block for the commit votes. When local commit is lagging too much,
237        // commit sync loop will trigger fetching.
238        self.commit_vote_monitor.observe_block(&verified_block);
239
240        // Update own received rounds and peer accepted rounds from this verified block.
241        self.round_tracker
242            .write()
243            .update_from_verified_block(&ExtendedBlock {
244                block: verified_block.clone(),
245                excluded_ancestors: excluded_ancestors.clone(),
246            });
247
248        // Reject blocks when local commit index is lagging too far from quorum commit index,
249        // to avoid the memory overhead from suspended blocks.
250        //
251        // IMPORTANT: this must be done after observing votes from the block, otherwise
252        // observed quorum commit will no longer progress.
253        //
254        // Since the main issue with too many suspended blocks is memory usage not CPU,
255        // it is ok to reject after block verifications instead of before.
256        let last_commit_index = self.dag_state.read().last_commit_index();
257        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
258        // The threshold to ignore block should be larger than commit_sync_batch_size,
259        // to avoid excessive block rejections and synchronizations.
260        if last_commit_index
261            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
262            < quorum_commit_index
263        {
264            self.context
265                .metrics
266                .node_metrics
267                .rejected_blocks
268                .with_label_values(&["commit_lagging"])
269                .inc();
270            debug!(
271                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
272                block_ref, last_commit_index, quorum_commit_index,
273            );
274            return Err(ConsensusError::BlockRejected {
275                block_ref,
276                reason: format!(
277                    "Last commit index is lagging quorum commit index too much ({} < {})",
278                    last_commit_index, quorum_commit_index,
279                ),
280            });
281        }
282
283        // The block is verified and current, so record own votes on the block
284        // before sending the block to Core.
285        if self.context.protocol_config.transaction_voting_enabled() {
286            self.transaction_certifier
287                .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
288        }
289
290        // Send the block to Core to try accepting it into the DAG.
291        let missing_ancestors = self
292            .core_dispatcher
293            .add_blocks(vec![verified_block.clone()])
294            .await
295            .map_err(|_| ConsensusError::Shutdown)?;
296
297        // Schedule fetching missing ancestors from this peer in the background.
298        if !missing_ancestors.is_empty() {
299            self.context
300                .metrics
301                .node_metrics
302                .handler_received_block_missing_ancestors
303                .with_label_values(&[peer_hostname])
304                .inc_by(missing_ancestors.len() as u64);
305            let synchronizer = self.synchronizer.clone();
306            spawn_monitored_task!(async move {
307                // This does not wait for the fetch request to complete.
308                // It only waits for synchronizer to queue the request to a peer.
309                // When this fails, it usually means the queue is full.
310                // The fetch will retry from other peers via live and periodic syncs.
311                if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
312                    debug!("Failed to fetch missing ancestors via synchronizer: {err}");
313                }
314            });
315        }
316
317        // Schedule fetching missing soft links from this peer in the background.
318        let missing_excluded_ancestors = self
319            .core_dispatcher
320            .check_block_refs(excluded_ancestors)
321            .await
322            .map_err(|_| ConsensusError::Shutdown)?;
323        if !missing_excluded_ancestors.is_empty() {
324            self.context
325                .metrics
326                .node_metrics
327                .network_excluded_ancestors_sent_to_fetch
328                .with_label_values(&[peer_hostname])
329                .inc_by(missing_excluded_ancestors.len() as u64);
330
331            let synchronizer = self.synchronizer.clone();
332            spawn_monitored_task!(async move {
333                if let Err(err) = synchronizer
334                    .fetch_blocks(missing_excluded_ancestors, peer)
335                    .await
336                {
337                    debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
338                }
339            });
340        }
341
342        Ok(())
343    }
344
345    async fn handle_subscribe_blocks(
346        &self,
347        peer: AuthorityIndex,
348        last_received: Round,
349    ) -> ConsensusResult<BlockStream> {
350        fail_point_async!("consensus-rpc-response");
351
352        // Find past proposed blocks as the initial blocks to send to the peer.
353        //
354        // If there are cached blocks in the range which the peer requested, send all of them.
355        // The size is bounded by the local GC round and DagState cache size.
356        //
357        // Otherwise if there is no cached block in the range which the peer requested,
358        // and this node has proposed blocks before, at least one block should be sent to the peer
359        // to help with liveness.
360        let past_proposed_blocks = {
361            let dag_state = self.dag_state.read();
362
363            let mut proposed_blocks =
364                dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
365            if proposed_blocks.is_empty() {
366                let last_proposed_block = dag_state.get_last_proposed_block();
367                proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
368                    vec![last_proposed_block]
369                } else {
370                    vec![]
371                };
372            }
373            stream::iter(
374                proposed_blocks
375                    .into_iter()
376                    .map(|block| ExtendedSerializedBlock {
377                        block: block.serialized().clone(),
378                        excluded_ancestors: vec![],
379                    }),
380            )
381        };
382
383        let broadcasted_blocks = BroadcastedBlockStream::new(
384            PeerId::Validator(peer),
385            self.rx_block_broadcast.resubscribe(),
386            self.subscription_counter.clone(),
387        );
388
389        // Return a stream of blocks that first yields missed blocks as requested, then new blocks.
390        Ok(Box::pin(past_proposed_blocks.chain(
391            broadcasted_blocks.map(ExtendedSerializedBlock::from),
392        )))
393    }
394
395    // Handles 3 types of requests:
396    // 1. Live sync:
397    //    - Both missing block refs and highest accepted rounds are specified.
398    //    - fetch_missing_ancestors is true.
399    //    - response returns max_blocks_per_sync blocks.
400    // 2. Periodic sync:
401    //    - Highest accepted rounds must be specified.
402    //    - Missing block refs are optional.
403    //    - fetch_missing_ancestors is false (default).
404    //    - response returns max_blocks_per_fetch blocks.
405    // 3. Commit sync:
406    //    - Missing block refs are specified.
407    //    - Highest accepted rounds are empty.
408    //    - fetch_missing_ancestors is false (default).
409    //    - response returns max_blocks_per_fetch blocks.
410    async fn handle_fetch_blocks(
411        &self,
412        _peer: AuthorityIndex,
413        mut block_refs: Vec<BlockRef>,
414        fetch_after_rounds: Vec<Round>,
415        fetch_missing_ancestors: bool,
416    ) -> ConsensusResult<Vec<Bytes>> {
417        fail_point_async!("consensus-rpc-response");
418
419        if block_refs.is_empty() && (fetch_missing_ancestors || fetch_after_rounds.is_empty()) {
420            return Err(ConsensusError::InvalidFetchBlocksRequest("When no block refs are provided, fetch_after_rounds must be provided and fetch_missing_ancestors must be false".to_string()));
421        }
422        if !fetch_after_rounds.is_empty()
423            && fetch_after_rounds.len() != self.context.committee.size()
424        {
425            return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
426                fetch_after_rounds.len(),
427                self.context.committee.size(),
428            ));
429        }
430
431        // Finds the suitable limit of # of blocks to return.
432        let max_response_num_blocks = if !fetch_after_rounds.is_empty() && !block_refs.is_empty() {
433            self.context.parameters.max_blocks_per_sync
434        } else {
435            self.context.parameters.max_blocks_per_fetch
436        };
437        if block_refs.len() > max_response_num_blocks {
438            block_refs.truncate(max_response_num_blocks);
439        }
440
441        // Validate the requested block refs.
442        for block in &block_refs {
443            if !self.context.committee.is_valid_index(block.author) {
444                return Err(ConsensusError::InvalidAuthorityIndex {
445                    index: block.author,
446                    max: self.context.committee.size(),
447                });
448            }
449            if block.round == GENESIS_ROUND {
450                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
451            }
452        }
453
454        // Get the requested blocks first.
455        let mut blocks = self
456            .dag_state
457            .read()
458            .get_blocks(&block_refs)
459            .into_iter()
460            .flatten()
461            .collect::<Vec<_>>();
462
463        // When fetch_missing_ancestors is true, fetch missing ancestors of the requested blocks.
464        // Otherwise, fetch additional blocks depth-first from the requested block authorities.
465        if blocks.len() < max_response_num_blocks && !fetch_after_rounds.is_empty() {
466            if fetch_missing_ancestors {
467                // Get unique missing ancestor blocks of the requested blocks (validated to be non-empty).
468                // fetch_after_rounds will only be used to filter out already accepted blocks.
469                let missing_ancestors = blocks
470                    .iter()
471                    .flat_map(|block| block.ancestors().to_vec())
472                    .filter(|block_ref| fetch_after_rounds[block_ref.author] < block_ref.round)
473                    .collect::<BTreeSet<_>>()
474                    .into_iter()
475                    .collect::<Vec<_>>();
476
477                // If there are too many missing ancestors, randomly select a subset to avoid
478                // fetching duplicated blocks across peers.
479                let selected_num_blocks = max_response_num_blocks
480                    .saturating_sub(blocks.len())
481                    .min(missing_ancestors.len());
482                if selected_num_blocks > 0 {
483                    let selected_ancestor_refs = missing_ancestors
484                        .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
485                        .copied()
486                        .collect::<Vec<_>>();
487                    let ancestor_blocks = self
488                        .dag_state
489                        .read()
490                        .get_blocks(&selected_ancestor_refs)
491                        .into_iter()
492                        .flatten();
493                    blocks.extend(ancestor_blocks);
494                }
495            } else {
496                // Get additional blocks from authorities with missing block.
497                // Compute the fetch round per requested authority, or all authorities.
498                let mut limit_rounds = BTreeMap::<AuthorityIndex, Round>::new();
499                if block_refs.is_empty() {
500                    let dag_state = self.dag_state.read();
501                    for (index, _authority) in self.context.committee.authorities() {
502                        let last_block = dag_state.get_last_block_for_authority(index);
503                        limit_rounds.insert(index, last_block.round());
504                    }
505                } else {
506                    for block_ref in &block_refs {
507                        let entry = limit_rounds
508                            .entry(block_ref.author)
509                            .or_insert(block_ref.round);
510                        *entry = (*entry).min(block_ref.round);
511                    }
512                }
513
514                // Use a min-heap to fetch blocks across authorities in ascending round order.
515                // Each entry is (fetch_start_round, authority, limit_round).
516                let mut heap = BinaryHeap::new();
517                for (authority, limit_round) in &limit_rounds {
518                    let fetch_start = fetch_after_rounds[*authority] + 1;
519                    if fetch_start < *limit_round {
520                        heap.push(Reverse((fetch_start, *authority, *limit_round)));
521                    }
522                }
523
524                while let Some(Reverse((fetch_start, authority, limit_round))) = heap.pop() {
525                    let fetched = self.store.scan_blocks_by_author_in_range(
526                        authority,
527                        fetch_start,
528                        limit_round,
529                        1,
530                    )?;
531                    if let Some(block) = fetched.into_iter().next() {
532                        let next_start = block.round() + 1;
533                        blocks.push(block);
534                        if blocks.len() >= max_response_num_blocks {
535                            blocks.truncate(max_response_num_blocks);
536                            break;
537                        }
538                        if next_start < limit_round {
539                            heap.push(Reverse((next_start, authority, limit_round)));
540                        }
541                    }
542                }
543            }
544        }
545
546        // Return the serialized blocks
547        let bytes = blocks
548            .into_iter()
549            .map(|block| block.serialized().clone())
550            .collect::<Vec<_>>();
551        Ok(bytes)
552    }
553
554    async fn handle_fetch_commits(
555        &self,
556        _peer: AuthorityIndex,
557        commit_range: CommitRange,
558    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
559        fail_point_async!("consensus-rpc-response");
560
561        // Compute an inclusive end index and bound the maximum number of commits scanned.
562        let inclusive_end = commit_range.end().min(
563            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
564                - 1,
565        );
566        let mut commits = self
567            .store
568            .scan_commits((commit_range.start()..=inclusive_end).into())?;
569        let mut certifier_block_refs = vec![];
570        'commit: while let Some(c) = commits.last() {
571            let index = c.index();
572            let votes = self.store.read_commit_votes(index)?;
573            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
574            for v in &votes {
575                stake_aggregator.add(v.author, &self.context.committee);
576            }
577            if stake_aggregator.reached_threshold(&self.context.committee) {
578                certifier_block_refs = votes;
579                break 'commit;
580            } else {
581                debug!(
582                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
583                    index,
584                    stake_aggregator.stake(),
585                    stake_aggregator.threshold(&self.context.committee)
586                );
587                self.context
588                    .metrics
589                    .node_metrics
590                    .commit_sync_fetch_commits_handler_uncertified_skipped
591                    .inc();
592                commits.pop();
593            }
594        }
595        let certifier_blocks = self
596            .store
597            .read_blocks(&certifier_block_refs)?
598            .into_iter()
599            .flatten()
600            .collect();
601        Ok((commits, certifier_blocks))
602    }
603
604    async fn handle_fetch_latest_blocks(
605        &self,
606        peer: AuthorityIndex,
607        authorities: Vec<AuthorityIndex>,
608    ) -> ConsensusResult<Vec<Bytes>> {
609        fail_point_async!("consensus-rpc-response");
610
611        if authorities.len() > self.context.committee.size() {
612            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
613        }
614
615        // Ensure that those are valid authorities
616        for authority in &authorities {
617            if !self.context.committee.is_valid_index(*authority) {
618                return Err(ConsensusError::InvalidAuthorityIndex {
619                    index: *authority,
620                    max: self.context.committee.size(),
621                });
622            }
623        }
624
625        // Read from the dag state to find the latest blocks.
626        // TODO: at the moment we don't look into the block manager for suspended blocks. Ideally we
627        // want in the future if we think we would like to tackle the majority of cases.
628        let mut blocks = vec![];
629        let dag_state = self.dag_state.read();
630        for authority in authorities {
631            let block = dag_state.get_last_block_for_authority(authority);
632
633            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
634
635            // no reason to serve back the genesis block - it's equal as if it has not received any block
636            if block.round() != GENESIS_ROUND {
637                blocks.push(block);
638            }
639        }
640
641        // Return the serialised blocks
642        let result = blocks
643            .into_iter()
644            .map(|block| block.serialized().clone())
645            .collect::<Vec<_>>();
646
647        Ok(result)
648    }
649
650    async fn handle_get_latest_rounds(
651        &self,
652        _peer: AuthorityIndex,
653    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
654        fail_point_async!("consensus-rpc-response");
655
656        let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
657
658        let blocks = self
659            .dag_state
660            .read()
661            .get_last_cached_block_per_authority(Round::MAX);
662        let highest_accepted_rounds = blocks
663            .into_iter()
664            .map(|(block, _)| block.round())
665            .collect::<Vec<_>>();
666
667        Ok((highest_received_rounds, highest_accepted_rounds))
668    }
669}
670
671#[async_trait]
672impl<C: CoreThreadDispatcher> ObserverNetworkService for AuthorityService<C> {
673    async fn handle_stream_blocks(
674        &self,
675        _peer: NodeId,
676        _highest_round_per_authority: Vec<u64>,
677    ) -> ConsensusResult<ObserverBlockStream> {
678        // TODO: Implement observer block streaming
679        todo!("Observer block streaming not yet implemented")
680    }
681
682    async fn handle_fetch_blocks(
683        &self,
684        _peer: NodeId,
685        _block_refs: Vec<BlockRef>,
686    ) -> ConsensusResult<Vec<Bytes>> {
687        // TODO: implement observer fetch blocks, similar to validator fetch_blocks but
688        // without highest_accepted_rounds.
689        Err(ConsensusError::NetworkRequest(
690            "Observer fetch blocks not yet implemented".to_string(),
691        ))
692    }
693
694    async fn handle_fetch_commits(
695        &self,
696        _peer: NodeId,
697        _commit_range: CommitRange,
698    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
699        // TODO: implement observer fetch commits, similar to validator fetch_commits.
700        Err(ConsensusError::NetworkRequest(
701            "Observer fetch commits not yet implemented".to_string(),
702        ))
703    }
704}
705struct Counter {
706    count: usize,
707    subscriptions_by_peer: BTreeMap<PeerId, usize>,
708}
709
710/// Atomically counts the number of active subscriptions to the block broadcast stream.
711pub(crate) struct SubscriptionCounter {
712    context: Arc<Context>,
713    counter: parking_lot::Mutex<Counter>,
714}
715
716impl SubscriptionCounter {
717    pub(crate) fn new(context: Arc<Context>) -> Self {
718        // Set the subscribed peers by default to 0
719        for (_, authority) in context.committee.authorities() {
720            context
721                .metrics
722                .node_metrics
723                .subscribed_by
724                .with_label_values(&[authority.hostname.as_str()])
725                .set(0);
726        }
727
728        Self {
729            counter: parking_lot::Mutex::new(Counter {
730                count: 0,
731                subscriptions_by_peer: BTreeMap::new(),
732            }),
733            context,
734        }
735    }
736
737    fn increment(&self, peer: &PeerId) -> Result<(), ConsensusError> {
738        let mut counter = self.counter.lock();
739        counter.count += 1;
740        *counter
741            .subscriptions_by_peer
742            .entry(peer.clone())
743            .or_insert(0) += 1;
744
745        match peer {
746            PeerId::Validator(authority) => {
747                let peer_hostname = &self.context.committee.authority(*authority).hostname;
748                self.context
749                    .metrics
750                    .node_metrics
751                    .subscribed_by
752                    .with_label_values(&[peer_hostname])
753                    .set(1);
754            }
755            PeerId::Observer(_) => {
756                self.context
757                    .metrics
758                    .node_metrics
759                    .subscribed_by
760                    .with_label_values(&["observer"])
761                    .inc();
762            }
763        }
764
765        Ok(())
766    }
767
768    fn decrement(&self, peer: &PeerId) -> Result<(), ConsensusError> {
769        let mut counter = self.counter.lock();
770        counter.count -= 1;
771        *counter
772            .subscriptions_by_peer
773            .entry(peer.clone())
774            .or_insert(0) -= 1;
775
776        if counter.subscriptions_by_peer[peer] == 0 {
777            match peer {
778                PeerId::Validator(authority) => {
779                    let peer_hostname = &self.context.committee.authority(*authority).hostname;
780                    self.context
781                        .metrics
782                        .node_metrics
783                        .subscribed_by
784                        .with_label_values(&[peer_hostname])
785                        .set(0);
786                }
787                PeerId::Observer(_) => {
788                    self.context
789                        .metrics
790                        .node_metrics
791                        .subscribed_by
792                        .with_label_values(&["observer"])
793                        .dec();
794                }
795            }
796        }
797
798        Ok(())
799    }
800}
801
802/// Each broadcasted block stream wraps a broadcast receiver for blocks.
803/// It yields blocks that are broadcasted after the stream is created.
804type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
805
806/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
807/// this tolerates lags with only logging, without yielding errors.
808pub(crate) struct BroadcastStream<T> {
809    peer: PeerId,
810    // Stores the receiver across poll_next() calls.
811    inner: ReusableBoxFuture<
812        'static,
813        (
814            Result<T, broadcast::error::RecvError>,
815            broadcast::Receiver<T>,
816        ),
817    >,
818    // Counts total subscriptions / active BroadcastStreams.
819    subscription_counter: Arc<SubscriptionCounter>,
820}
821
822impl<T: 'static + Clone + Send> BroadcastStream<T> {
823    pub fn new(
824        peer: PeerId,
825        rx: broadcast::Receiver<T>,
826        subscription_counter: Arc<SubscriptionCounter>,
827    ) -> Self {
828        if let Err(err) = subscription_counter.increment(&peer) {
829            match err {
830                ConsensusError::Shutdown => {}
831                _ => panic!("Unexpected error: {err}"),
832            }
833        }
834        Self {
835            peer,
836            inner: ReusableBoxFuture::new(make_recv_future(rx)),
837            subscription_counter,
838        }
839    }
840}
841
842impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
843    type Item = T;
844
845    fn poll_next(
846        mut self: Pin<&mut Self>,
847        cx: &mut task::Context<'_>,
848    ) -> task::Poll<Option<Self::Item>> {
849        let peer = self.peer.clone();
850        let maybe_item = loop {
851            let (result, rx) = ready!(self.inner.poll(cx));
852            self.inner.set(make_recv_future(rx));
853
854            match result {
855                Ok(item) => break Some(item),
856                Err(broadcast::error::RecvError::Closed) => {
857                    info!("Block BroadcastedBlockStream {} closed", peer);
858                    break None;
859                }
860                Err(broadcast::error::RecvError::Lagged(n)) => {
861                    warn!(
862                        "Block BroadcastedBlockStream {} lagged by {} messages",
863                        peer, n
864                    );
865                    continue;
866                }
867            }
868        };
869        task::Poll::Ready(maybe_item)
870    }
871}
872
873impl<T> Drop for BroadcastStream<T> {
874    fn drop(&mut self) {
875        if let Err(err) = self.subscription_counter.decrement(&self.peer) {
876            match err {
877                ConsensusError::Shutdown => {}
878                _ => panic!("Unexpected error: {err}"),
879            }
880        }
881    }
882}
883
884async fn make_recv_future<T: Clone>(
885    mut rx: broadcast::Receiver<T>,
886) -> (
887    Result<T, broadcast::error::RecvError>,
888    broadcast::Receiver<T>,
889) {
890    let result = rx.recv().await;
891    (result, rx)
892}
893
894// TODO: add a unit test for BroadcastStream.
895
896#[cfg(test)]
897mod tests {
898    use std::{
899        collections::{BTreeMap, BTreeSet},
900        sync::Arc,
901        time::Duration,
902    };
903
904    use async_trait::async_trait;
905    use bytes::Bytes;
906    use consensus_config::AuthorityIndex;
907    use consensus_types::block::{BlockDigest, BlockRef, Round};
908    use parking_lot::{Mutex, RwLock};
909    use tokio::{sync::broadcast, time::sleep};
910
911    use futures::StreamExt as _;
912
913    use crate::{
914        authority_service::AuthorityService,
915        block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
916        commit::{CertifiedCommits, CommitRange},
917        commit_vote_monitor::CommitVoteMonitor,
918        context::Context,
919        core_thread::{CoreError, CoreThreadDispatcher},
920        dag_state::DagState,
921        error::ConsensusResult,
922        network::{
923            BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
924            ValidatorNetworkClient, ValidatorNetworkService,
925        },
926        round_tracker::RoundTracker,
927        storage::mem_store::MemStore,
928        synchronizer::Synchronizer,
929        test_dag_builder::DagBuilder,
930        transaction_certifier::TransactionCertifier,
931    };
932    struct FakeCoreThreadDispatcher {
933        blocks: Mutex<Vec<VerifiedBlock>>,
934    }
935
936    impl FakeCoreThreadDispatcher {
937        fn new() -> Self {
938            Self {
939                blocks: Mutex::new(vec![]),
940            }
941        }
942
943        fn get_blocks(&self) -> Vec<VerifiedBlock> {
944            self.blocks.lock().clone()
945        }
946    }
947
948    #[async_trait]
949    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
950        async fn add_blocks(
951            &self,
952            blocks: Vec<VerifiedBlock>,
953        ) -> Result<BTreeSet<BlockRef>, CoreError> {
954            let block_refs = blocks.iter().map(|b| b.reference()).collect();
955            self.blocks.lock().extend(blocks);
956            Ok(block_refs)
957        }
958
959        async fn check_block_refs(
960            &self,
961            _block_refs: Vec<BlockRef>,
962        ) -> Result<BTreeSet<BlockRef>, CoreError> {
963            Ok(BTreeSet::new())
964        }
965
966        async fn add_certified_commits(
967            &self,
968            _commits: CertifiedCommits,
969        ) -> Result<BTreeSet<BlockRef>, CoreError> {
970            todo!()
971        }
972
973        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
974            Ok(())
975        }
976
977        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
978            Ok(Default::default())
979        }
980
981        fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
982            todo!()
983        }
984
985        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
986            todo!()
987        }
988    }
989
990    #[derive(Default)]
991    struct FakeNetworkClient {}
992
993    #[async_trait]
994    impl ValidatorNetworkClient for FakeNetworkClient {
995        async fn send_block(
996            &self,
997            _peer: AuthorityIndex,
998            _block: &VerifiedBlock,
999            _timeout: Duration,
1000        ) -> ConsensusResult<()> {
1001            unimplemented!("Unimplemented")
1002        }
1003
1004        async fn subscribe_blocks(
1005            &self,
1006            _peer: AuthorityIndex,
1007            _last_received: Round,
1008            _timeout: Duration,
1009        ) -> ConsensusResult<BlockStream> {
1010            unimplemented!("Unimplemented")
1011        }
1012
1013        async fn fetch_blocks(
1014            &self,
1015            _peer: AuthorityIndex,
1016            _block_refs: Vec<BlockRef>,
1017            _fetch_after_rounds: Vec<Round>,
1018            _fetch_missing_ancestors: bool,
1019            _timeout: Duration,
1020        ) -> ConsensusResult<Vec<Bytes>> {
1021            unimplemented!("Unimplemented")
1022        }
1023
1024        async fn fetch_commits(
1025            &self,
1026            _peer: AuthorityIndex,
1027            _commit_range: CommitRange,
1028            _timeout: Duration,
1029        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1030            unimplemented!("Unimplemented")
1031        }
1032
1033        async fn fetch_latest_blocks(
1034            &self,
1035            _peer: AuthorityIndex,
1036            _authorities: Vec<AuthorityIndex>,
1037            _timeout: Duration,
1038        ) -> ConsensusResult<Vec<Bytes>> {
1039            unimplemented!("Unimplemented")
1040        }
1041
1042        async fn get_latest_rounds(
1043            &self,
1044            _peer: AuthorityIndex,
1045            _timeout: Duration,
1046        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1047            unimplemented!("Unimplemented")
1048        }
1049    }
1050
1051    #[async_trait]
1052    impl ObserverNetworkClient for FakeNetworkClient {
1053        async fn stream_blocks(
1054            &self,
1055            _peer: crate::network::PeerId,
1056            _highest_round_per_authority: Vec<u64>,
1057            _timeout: Duration,
1058        ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1059            unimplemented!("Unimplemented")
1060        }
1061
1062        async fn fetch_blocks(
1063            &self,
1064            _peer: crate::network::PeerId,
1065            _block_refs: Vec<BlockRef>,
1066            _timeout: Duration,
1067        ) -> ConsensusResult<Vec<Bytes>> {
1068            unimplemented!("Unimplemented")
1069        }
1070
1071        async fn fetch_commits(
1072            &self,
1073            _peer: crate::network::PeerId,
1074            _commit_range: CommitRange,
1075            _timeout: Duration,
1076        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1077            unimplemented!("Unimplemented")
1078        }
1079    }
1080
1081    #[tokio::test(flavor = "current_thread", start_paused = true)]
1082    async fn test_handle_send_block() {
1083        let (context, _keys) = Context::new_for_test(4);
1084        let context = Arc::new(context);
1085        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1086        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1087        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1088        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1089        let fake_client = Arc::new(FakeNetworkClient::default());
1090        let network_client = Arc::new(SynchronizerClient::new(
1091            context.clone(),
1092            Some(fake_client.clone()),
1093            Some(fake_client.clone()),
1094        ));
1095        let store = Arc::new(MemStore::new());
1096        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1097        let transaction_certifier =
1098            TransactionCertifier::new(context.clone(), block_verifier.clone(), dag_state.clone());
1099        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1100        let synchronizer = Synchronizer::start(
1101            network_client,
1102            context.clone(),
1103            core_dispatcher.clone(),
1104            commit_vote_monitor.clone(),
1105            block_verifier.clone(),
1106            transaction_certifier.clone(),
1107            round_tracker.clone(),
1108            dag_state.clone(),
1109            false,
1110        );
1111        let authority_service = Arc::new(AuthorityService::new(
1112            context.clone(),
1113            block_verifier,
1114            commit_vote_monitor,
1115            round_tracker,
1116            synchronizer,
1117            core_dispatcher.clone(),
1118            rx_block_broadcast,
1119            transaction_certifier,
1120            dag_state,
1121            store,
1122        ));
1123
1124        // Test delaying blocks with time drift.
1125        let now = context.clock.timestamp_utc_ms();
1126        let max_drift = context.parameters.max_forward_time_drift;
1127        let input_block = VerifiedBlock::new_for_test(
1128            TestBlock::new(9, 0)
1129                .set_timestamp_ms(now + max_drift.as_millis() as u64)
1130                .build(),
1131        );
1132
1133        let service = authority_service.clone();
1134        let serialized = ExtendedSerializedBlock {
1135            block: input_block.serialized().clone(),
1136            excluded_ancestors: vec![],
1137        };
1138
1139        tokio::spawn({
1140            let service = service.clone();
1141            let context = context.clone();
1142            async move {
1143                service
1144                    .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1145                    .await
1146                    .unwrap();
1147            }
1148        });
1149
1150        sleep(max_drift / 2).await;
1151
1152        let blocks = core_dispatcher.get_blocks();
1153        assert_eq!(blocks.len(), 1);
1154        assert_eq!(blocks[0], input_block);
1155
1156        // Test invalid block.
1157        let invalid_block =
1158            VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1159        let extended_block = ExtendedSerializedBlock {
1160            block: invalid_block.serialized().clone(),
1161            excluded_ancestors: vec![],
1162        };
1163        service
1164            .handle_send_block(
1165                context.committee.to_authority_index(0).unwrap(),
1166                extended_block,
1167            )
1168            .await
1169            .unwrap_err();
1170
1171        // Test invalid excluded ancestors.
1172        let invalid_excluded_ancestors = vec![
1173            bcs::to_bytes(&BlockRef::new(
1174                10,
1175                AuthorityIndex::new_for_test(1000),
1176                BlockDigest::MIN,
1177            ))
1178            .unwrap(),
1179            vec![3u8; 40],
1180            bcs::to_bytes(&invalid_block.reference()).unwrap(),
1181        ];
1182        let extended_block = ExtendedSerializedBlock {
1183            block: input_block.serialized().clone(),
1184            excluded_ancestors: invalid_excluded_ancestors,
1185        };
1186        service
1187            .handle_send_block(
1188                context.committee.to_authority_index(0).unwrap(),
1189                extended_block,
1190            )
1191            .await
1192            .unwrap_err();
1193    }
1194
1195    #[tokio::test(flavor = "current_thread", start_paused = true)]
1196    async fn test_handle_fetch_blocks() {
1197        // GIVEN
1198        // Use NUM_AUTHORITIES and NUM_ROUNDS higher than max_blocks_per_sync to test limits.
1199        const NUM_AUTHORITIES: usize = 40;
1200        const NUM_ROUNDS: usize = 40;
1201        let (mut context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1202        context.parameters.max_blocks_per_fetch = 50;
1203        let context = Arc::new(context);
1204        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1205        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1206        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1207        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1208        let fake_client = Arc::new(FakeNetworkClient::default());
1209        let network_client = Arc::new(SynchronizerClient::new(
1210            context.clone(),
1211            Some(fake_client.clone()),
1212            Some(fake_client.clone()),
1213        ));
1214        let store = Arc::new(MemStore::new());
1215        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1216        let transaction_certifier =
1217            TransactionCertifier::new(context.clone(), block_verifier.clone(), dag_state.clone());
1218        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1219        let synchronizer = Synchronizer::start(
1220            network_client,
1221            context.clone(),
1222            core_dispatcher.clone(),
1223            commit_vote_monitor.clone(),
1224            block_verifier.clone(),
1225            transaction_certifier.clone(),
1226            round_tracker.clone(),
1227            dag_state.clone(),
1228            false,
1229        );
1230        let authority_service = Arc::new(AuthorityService::new(
1231            context.clone(),
1232            block_verifier,
1233            commit_vote_monitor,
1234            round_tracker,
1235            synchronizer,
1236            core_dispatcher.clone(),
1237            rx_block_broadcast,
1238            transaction_certifier,
1239            dag_state.clone(),
1240            store,
1241        ));
1242
1243        // GIVEN: 40 rounds of blocks in the dag state.
1244        let mut dag_builder = DagBuilder::new(context.clone());
1245        dag_builder
1246            .layers(1..=(NUM_ROUNDS as u32))
1247            .build()
1248            .persist_layers(dag_state.clone());
1249        dag_state.write().flush();
1250        let all_blocks = dag_builder.all_blocks();
1251
1252        // WHEN: Request 2 blocks from round 40, fetch missing ancestors enabled.
1253        let missing_block_refs: Vec<BlockRef> = all_blocks
1254            .iter()
1255            .rev()
1256            .take(2)
1257            .map(|b| b.reference())
1258            .collect();
1259        let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1260        let results = authority_service
1261            .handle_fetch_blocks(
1262                AuthorityIndex::new_for_test(0),
1263                missing_block_refs.clone(),
1264                highest_accepted_rounds,
1265                true,
1266            )
1267            .await
1268            .unwrap();
1269
1270        // THEN: the expected number of unique blocks are returned.
1271        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1272            .iter()
1273            .map(|b| {
1274                let signed = bcs::from_bytes(b).unwrap();
1275                let block = VerifiedBlock::new_verified(signed, b.clone());
1276                (block.reference(), block)
1277            })
1278            .collect();
1279        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1280        // All missing blocks are returned.
1281        for b in &missing_block_refs {
1282            assert!(blocks.contains_key(b));
1283        }
1284        let num_missing_ancestors = blocks
1285            .keys()
1286            .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1287            .count();
1288        assert_eq!(
1289            num_missing_ancestors,
1290            context.parameters.max_blocks_per_sync - missing_block_refs.len()
1291        );
1292
1293        // WHEN: Request 2 blocks from round 37, fetch missing ancestors disabled.
1294        let missing_round = NUM_ROUNDS as Round - 3;
1295        let missing_block_refs: Vec<BlockRef> = all_blocks
1296            .iter()
1297            .filter(|b| b.reference().round == missing_round)
1298            .map(|b| b.reference())
1299            .take(2)
1300            .collect();
1301        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1302        // Try to fill up the blocks from the 1st authority in missing_block_refs.
1303        highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1304        let results = authority_service
1305            .handle_fetch_blocks(
1306                AuthorityIndex::new_for_test(0),
1307                missing_block_refs.clone(),
1308                highest_accepted_rounds,
1309                false,
1310            )
1311            .await
1312            .unwrap();
1313
1314        // THEN: the expected number of unique blocks are returned.
1315        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1316            .iter()
1317            .map(|b| {
1318                let signed = bcs::from_bytes(b).unwrap();
1319                let block = VerifiedBlock::new_verified(signed, b.clone());
1320                (block.reference(), block)
1321            })
1322            .collect();
1323        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1324        // All missing blocks are returned.
1325        for b in &missing_block_refs {
1326            assert!(blocks.contains_key(b));
1327        }
1328        // Ancestor blocks are from the expected rounds and authorities.
1329        let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1330        for b in blocks.keys() {
1331            assert!(b.round <= missing_round);
1332            assert!(expected_authors.contains(&b.author));
1333        }
1334
1335        // WHEN: Request with empty block_refs, fetch missing ancestors disabled.
1336        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1337        // Set a few authorities to higher accepted rounds.
1338        highest_accepted_rounds[0] = (NUM_ROUNDS as Round) - 5;
1339        highest_accepted_rounds[1] = (NUM_ROUNDS as Round) - 3;
1340        let results = authority_service
1341            .handle_fetch_blocks(
1342                AuthorityIndex::new_for_test(0),
1343                vec![],
1344                highest_accepted_rounds.clone(),
1345                false,
1346            )
1347            .await
1348            .unwrap();
1349
1350        // THEN: the expected number of unique blocks are returned.
1351        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1352            .iter()
1353            .map(|b| {
1354                let signed = bcs::from_bytes(b).unwrap();
1355                let block = VerifiedBlock::new_verified(signed, b.clone());
1356                (block.reference(), block)
1357            })
1358            .collect();
1359        assert_eq!(blocks.len(), context.parameters.max_blocks_per_fetch);
1360        // Blocks should be from all authorities, within the expected round range.
1361        for block_ref in blocks.keys() {
1362            let accepted = highest_accepted_rounds[block_ref.author];
1363            assert!(block_ref.round > accepted);
1364        }
1365        // Blocks should be fetched in ascending round order across authorities,
1366        // so blocks should have low rounds near the accepted rounds.
1367        let max_round_in_result = blocks.keys().map(|b| b.round).max().unwrap();
1368        // With 40 authorities mostly at accepted round 1 and max_blocks_per_fetch=50,
1369        // the min-heap fills ~1-2 rounds per authority.
1370        assert!(
1371            max_round_in_result <= 4,
1372            "Expected low rounds from fair round-order fetching, got max round {}",
1373            max_round_in_result
1374        );
1375
1376        // WHEN: Request 5 block from round 40, not getting ancestors.
1377        let missing_block_refs: Vec<BlockRef> = all_blocks
1378            .iter()
1379            .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1380            .map(|b| b.reference())
1381            .take(5)
1382            .collect();
1383        let results = authority_service
1384            .handle_fetch_blocks(
1385                AuthorityIndex::new_for_test(0),
1386                missing_block_refs.clone(),
1387                vec![],
1388                false,
1389            )
1390            .await
1391            .unwrap();
1392
1393        // THEN: the expected number of unique blocks are returned.
1394        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1395            .iter()
1396            .map(|b| {
1397                let signed = bcs::from_bytes(b).unwrap();
1398                let block = VerifiedBlock::new_verified(signed, b.clone());
1399                (block.reference(), block)
1400            })
1401            .collect();
1402        assert_eq!(blocks.len(), 5);
1403        for b in &missing_block_refs {
1404            assert!(blocks.contains_key(b));
1405        }
1406    }
1407
1408    #[tokio::test(flavor = "current_thread", start_paused = true)]
1409    async fn test_handle_fetch_latest_blocks() {
1410        // GIVEN
1411        let (context, _keys) = Context::new_for_test(4);
1412        let context = Arc::new(context);
1413        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1414        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1415        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1416        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1417        let fake_client = Arc::new(FakeNetworkClient::default());
1418        let network_client = Arc::new(SynchronizerClient::new(
1419            context.clone(),
1420            Some(fake_client.clone()),
1421            Some(fake_client.clone()),
1422        ));
1423        let store = Arc::new(MemStore::new());
1424        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1425        let transaction_certifier =
1426            TransactionCertifier::new(context.clone(), block_verifier.clone(), dag_state.clone());
1427        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1428        let synchronizer = Synchronizer::start(
1429            network_client,
1430            context.clone(),
1431            core_dispatcher.clone(),
1432            commit_vote_monitor.clone(),
1433            block_verifier.clone(),
1434            transaction_certifier.clone(),
1435            round_tracker.clone(),
1436            dag_state.clone(),
1437            true,
1438        );
1439        let authority_service = Arc::new(AuthorityService::new(
1440            context.clone(),
1441            block_verifier,
1442            commit_vote_monitor,
1443            round_tracker,
1444            synchronizer,
1445            core_dispatcher.clone(),
1446            rx_block_broadcast,
1447            transaction_certifier,
1448            dag_state.clone(),
1449            store,
1450        ));
1451
1452        // Create some blocks for a few authorities. Create some equivocations as well and store in dag state.
1453        let mut dag_builder = DagBuilder::new(context.clone());
1454        dag_builder
1455            .layers(1..=10)
1456            .authorities(vec![AuthorityIndex::new_for_test(2)])
1457            .equivocate(1)
1458            .build()
1459            .persist_layers(dag_state);
1460
1461        // WHEN
1462        let authorities_to_request = vec![
1463            AuthorityIndex::new_for_test(1),
1464            AuthorityIndex::new_for_test(2),
1465        ];
1466        let results = authority_service
1467            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1468            .await;
1469
1470        // THEN
1471        let serialised_blocks = results.unwrap();
1472        for serialised_block in serialised_blocks {
1473            let signed_block: SignedBlock =
1474                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1475            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1476
1477            assert_eq!(verified_block.round(), 10);
1478        }
1479    }
1480
1481    #[tokio::test(flavor = "current_thread", start_paused = true)]
1482    async fn test_handle_subscribe_blocks() {
1483        let (context, _keys) = Context::new_for_test(4);
1484        let context = Arc::new(context);
1485        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1486        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1487        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1488        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1489        let fake_client = Arc::new(FakeNetworkClient::default());
1490        let network_client = Arc::new(SynchronizerClient::new(
1491            context.clone(),
1492            Some(fake_client.clone()),
1493            Some(fake_client.clone()),
1494        ));
1495        let store = Arc::new(MemStore::new());
1496        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1497        let transaction_certifier =
1498            TransactionCertifier::new(context.clone(), block_verifier.clone(), dag_state.clone());
1499        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1500        let synchronizer = Synchronizer::start(
1501            network_client,
1502            context.clone(),
1503            core_dispatcher.clone(),
1504            commit_vote_monitor.clone(),
1505            block_verifier.clone(),
1506            transaction_certifier.clone(),
1507            round_tracker.clone(),
1508            dag_state.clone(),
1509            false,
1510        );
1511
1512        // Create 3 proposed blocks at rounds 5, 10, 15 for own authority (index 0)
1513        dag_state
1514            .write()
1515            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1516        dag_state
1517            .write()
1518            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1519        dag_state
1520            .write()
1521            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1522
1523        let authority_service = Arc::new(AuthorityService::new(
1524            context.clone(),
1525            block_verifier,
1526            commit_vote_monitor,
1527            round_tracker,
1528            synchronizer,
1529            core_dispatcher.clone(),
1530            rx_block_broadcast,
1531            transaction_certifier,
1532            dag_state.clone(),
1533            store,
1534        ));
1535
1536        let peer = context.committee.to_authority_index(1).unwrap();
1537
1538        // Case A: Subscribe with last_received = 100 (after all proposed blocks)
1539        // Should return last proposed block (round 15) as fallback
1540        {
1541            let mut stream = authority_service
1542                .handle_subscribe_blocks(peer, 100)
1543                .await
1544                .unwrap();
1545            let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1546            assert_eq!(
1547                block.round(),
1548                15,
1549                "Should return last proposed block as fallback"
1550            );
1551            assert_eq!(block.author().value(), 0);
1552        }
1553
1554        // Case B: Subscribe with last_received = 7 (includes rounds 10, 15)
1555        // Should return cached blocks from round 8+
1556        {
1557            let mut stream = authority_service
1558                .handle_subscribe_blocks(peer, 7)
1559                .await
1560                .unwrap();
1561
1562            let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1563            assert_eq!(block1.round(), 10, "Should return block at round 10");
1564
1565            let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1566            assert_eq!(block2.round(), 15, "Should return block at round 15");
1567        }
1568    }
1569
1570    #[tokio::test(flavor = "current_thread", start_paused = true)]
1571    async fn test_handle_subscribe_blocks_not_proposed() {
1572        let (context, _keys) = Context::new_for_test(4);
1573        let context = Arc::new(context);
1574        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1575        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1576        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1577        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1578        let fake_client = Arc::new(FakeNetworkClient::default());
1579        let network_client = Arc::new(SynchronizerClient::new(
1580            context.clone(),
1581            Some(fake_client.clone()),
1582            Some(fake_client.clone()),
1583        ));
1584        let store = Arc::new(MemStore::new());
1585        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1586        let transaction_certifier =
1587            TransactionCertifier::new(context.clone(), block_verifier.clone(), dag_state.clone());
1588        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1589        let synchronizer = Synchronizer::start(
1590            network_client,
1591            context.clone(),
1592            core_dispatcher.clone(),
1593            commit_vote_monitor.clone(),
1594            block_verifier.clone(),
1595            transaction_certifier.clone(),
1596            round_tracker.clone(),
1597            dag_state.clone(),
1598            false,
1599        );
1600
1601        // No blocks added to DagState - only genesis exists
1602
1603        let authority_service = Arc::new(AuthorityService::new(
1604            context.clone(),
1605            block_verifier,
1606            commit_vote_monitor,
1607            round_tracker,
1608            synchronizer,
1609            core_dispatcher.clone(),
1610            rx_block_broadcast,
1611            transaction_certifier,
1612            dag_state.clone(),
1613            store,
1614        ));
1615
1616        let peer = context.committee.to_authority_index(1).unwrap();
1617
1618        // Subscribe - no blocks have been proposed yet (only genesis exists)
1619        let mut stream = authority_service
1620            .handle_subscribe_blocks(peer, 0)
1621            .await
1622            .unwrap();
1623
1624        // Should NOT receive any block (genesis must not be returned)
1625        use futures::poll;
1626        use std::task::Poll;
1627        let poll_result = poll!(stream.next());
1628        assert!(
1629            matches!(poll_result, Poll::Pending),
1630            "Should not receive genesis block on subscription stream"
1631        );
1632    }
1633}