consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::Reverse,
6    collections::{BTreeMap, BTreeSet, BinaryHeap},
7    pin::Pin,
8    sync::Arc,
9    time::Duration,
10};
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockRef, Round};
16use futures::{Stream, StreamExt, ready, stream, task};
17use mysten_metrics::spawn_monitored_task;
18use parking_lot::RwLock;
19use rand::seq::SliceRandom as _;
20use sui_macros::fail_point_async;
21use tap::TapFallible;
22use tokio::sync::broadcast;
23use tokio_util::sync::ReusableBoxFuture;
24use tracing::{debug, info, warn};
25
26use crate::{
27    CommitIndex,
28    block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
29    block_verifier::BlockVerifier,
30    commit::{CommitAPI as _, CommitRange, TrustedCommit},
31    commit_vote_monitor::CommitVoteMonitor,
32    context::Context,
33    core_thread::CoreThreadDispatcher,
34    dag_state::DagState,
35    error::{ConsensusError, ConsensusResult},
36    network::{
37        BlockStream, ExtendedSerializedBlock, NodeId, ObserverBlockStream, ObserverBlockStreamItem,
38        ObserverNetworkService, PeerId, ValidatorNetworkService,
39    },
40    round_tracker::RoundTracker,
41    stake_aggregator::{QuorumThreshold, StakeAggregator},
42    storage::Store,
43    synchronizer::SynchronizerHandle,
44    transaction_vote_tracker::TransactionVoteTracker,
45};
46
47pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
48
49/// Authority's network service implementation, agnostic to the actual networking stack used.
50pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
51    context: Arc<Context>,
52    commit_vote_monitor: Arc<CommitVoteMonitor>,
53    block_verifier: Arc<dyn BlockVerifier>,
54    synchronizer: Arc<SynchronizerHandle>,
55    core_dispatcher: Arc<C>,
56    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
57    subscription_counter: Arc<SubscriptionCounter>,
58    transaction_vote_tracker: TransactionVoteTracker,
59    dag_state: Arc<RwLock<DagState>>,
60    store: Arc<dyn Store>,
61    round_tracker: Arc<RwLock<RoundTracker>>,
62}
63
64impl<C: CoreThreadDispatcher> AuthorityService<C> {
65    pub(crate) fn new(
66        context: Arc<Context>,
67        block_verifier: Arc<dyn BlockVerifier>,
68        commit_vote_monitor: Arc<CommitVoteMonitor>,
69        round_tracker: Arc<RwLock<RoundTracker>>,
70        synchronizer: Arc<SynchronizerHandle>,
71        core_dispatcher: Arc<C>,
72        rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
73        transaction_vote_tracker: TransactionVoteTracker,
74        dag_state: Arc<RwLock<DagState>>,
75        store: Arc<dyn Store>,
76    ) -> Self {
77        let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
78        Self {
79            context,
80            block_verifier,
81            commit_vote_monitor,
82            synchronizer,
83            core_dispatcher,
84            rx_block_broadcast,
85            subscription_counter,
86            transaction_vote_tracker,
87            dag_state,
88            store,
89            round_tracker,
90        }
91    }
92
93    // Parses and validates serialized excluded ancestors.
94    fn parse_excluded_ancestors(
95        &self,
96        peer: AuthorityIndex,
97        block: &VerifiedBlock,
98        mut excluded_ancestors: Vec<Vec<u8>>,
99    ) -> ConsensusResult<Vec<BlockRef>> {
100        let peer_hostname = &self.context.committee.authority(peer).hostname;
101
102        let excluded_ancestors_limit = self.context.committee.size() * 2;
103        if excluded_ancestors.len() > excluded_ancestors_limit {
104            debug!(
105                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
106                excluded_ancestors.len() - excluded_ancestors_limit,
107                peer,
108                peer_hostname,
109            );
110            excluded_ancestors.truncate(excluded_ancestors_limit);
111        }
112
113        let excluded_ancestors = excluded_ancestors
114            .into_iter()
115            .map(|serialized| {
116                let block_ref: BlockRef =
117                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
118                if !self.context.committee.is_valid_index(block_ref.author) {
119                    return Err(ConsensusError::InvalidAuthorityIndex {
120                        index: block_ref.author,
121                        max: self.context.committee.size(),
122                    });
123                }
124                if block_ref.round >= block.round() {
125                    return Err(ConsensusError::InvalidAncestorRound {
126                        ancestor: block_ref.round,
127                        block: block.round(),
128                    });
129                }
130                Ok(block_ref)
131            })
132            .collect::<ConsensusResult<Vec<BlockRef>>>()?;
133
134        for excluded_ancestor in &excluded_ancestors {
135            let excluded_ancestor_hostname = &self
136                .context
137                .committee
138                .authority(excluded_ancestor.author)
139                .hostname;
140            self.context
141                .metrics
142                .node_metrics
143                .network_excluded_ancestors_count_by_authority
144                .with_label_values(&[excluded_ancestor_hostname])
145                .inc();
146        }
147        self.context
148            .metrics
149            .node_metrics
150            .network_received_excluded_ancestors_from_authority
151            .with_label_values(&[peer_hostname])
152            .inc_by(excluded_ancestors.len() as u64);
153
154        Ok(excluded_ancestors)
155    }
156}
157
158#[async_trait]
159impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
160    async fn handle_send_block(
161        &self,
162        peer: AuthorityIndex,
163        serialized_block: ExtendedSerializedBlock,
164    ) -> ConsensusResult<()> {
165        fail_point_async!("consensus-rpc-response");
166
167        let peer_hostname = &self.context.committee.authority(peer).hostname;
168
169        // TODO: dedup block verifications, here and with fetched blocks.
170        let signed_block: SignedBlock =
171            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
172
173        // Reject blocks not produced by the peer.
174        if peer != signed_block.author() {
175            self.context
176                .metrics
177                .node_metrics
178                .invalid_blocks
179                .with_label_values(&[
180                    peer_hostname.as_str(),
181                    "handle_send_block",
182                    "UnexpectedAuthority",
183                ])
184                .inc();
185            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
186            info!("Block with wrong authority from {}: {}", peer, e);
187            return Err(e);
188        }
189
190        // Reject blocks failing parsing and validations.
191        let (verified_block, reject_txn_votes) = self
192            .block_verifier
193            .verify_and_vote(signed_block, serialized_block.block)
194            .tap_err(|e| {
195                self.context
196                    .metrics
197                    .node_metrics
198                    .invalid_blocks
199                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
200                    .inc();
201                info!("Invalid block from {}: {}", peer, e);
202            })?;
203        let excluded_ancestors = self
204            .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
205            .tap_err(|e| {
206                debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
207                self.context
208                    .metrics
209                    .node_metrics
210                    .invalid_blocks
211                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
212                    .inc();
213            })?;
214
215        let block_ref = verified_block.reference();
216        debug!("Received block {} via send block.", block_ref);
217
218        self.context
219            .metrics
220            .node_metrics
221            .verified_blocks
222            .with_label_values(&[peer_hostname])
223            .inc();
224
225        let now = self.context.clock.timestamp_utc_ms();
226        let forward_time_drift =
227            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
228
229        self.context
230            .metrics
231            .node_metrics
232            .block_timestamp_drift_ms
233            .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
234            .inc_by(forward_time_drift.as_millis() as u64);
235
236        // Observe the block for the commit votes. When local commit is lagging too much,
237        // commit sync loop will trigger fetching.
238        self.commit_vote_monitor.observe_block(&verified_block);
239
240        // Update own received rounds and peer accepted rounds from this verified block.
241        self.round_tracker
242            .write()
243            .update_from_verified_block(&ExtendedBlock {
244                block: verified_block.clone(),
245                excluded_ancestors: excluded_ancestors.clone(),
246            });
247
248        // Reject blocks when local commit index is lagging too far from quorum commit index,
249        // to avoid the memory overhead from suspended blocks.
250        //
251        // IMPORTANT: this must be done after observing votes from the block, otherwise
252        // observed quorum commit will no longer progress.
253        //
254        // Since the main issue with too many suspended blocks is memory usage not CPU,
255        // it is ok to reject after block verifications instead of before.
256        let last_commit_index = self.dag_state.read().last_commit_index();
257        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
258        // The threshold to ignore block should be larger than commit_sync_batch_size,
259        // to avoid excessive block rejections and synchronizations.
260        if last_commit_index
261            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
262            < quorum_commit_index
263        {
264            self.context
265                .metrics
266                .node_metrics
267                .rejected_blocks
268                .with_label_values(&["commit_lagging"])
269                .inc();
270            debug!(
271                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
272                block_ref, last_commit_index, quorum_commit_index,
273            );
274            return Err(ConsensusError::BlockRejected {
275                block_ref,
276                reason: format!(
277                    "Last commit index is lagging quorum commit index too much ({} < {})",
278                    last_commit_index, quorum_commit_index,
279                ),
280            });
281        }
282
283        // The block is verified and current, so record own votes on the block
284        // before sending the block to Core.
285        if self.context.protocol_config.transaction_voting_enabled() {
286            self.transaction_vote_tracker
287                .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
288        }
289
290        // Send the block to Core to try accepting it into the DAG.
291        let missing_ancestors = self
292            .core_dispatcher
293            .add_blocks(vec![verified_block.clone()])
294            .await
295            .map_err(|_| ConsensusError::Shutdown)?;
296
297        // Schedule fetching missing ancestors from this peer in the background.
298        if !missing_ancestors.is_empty() {
299            self.context
300                .metrics
301                .node_metrics
302                .handler_received_block_missing_ancestors
303                .with_label_values(&[peer_hostname])
304                .inc_by(missing_ancestors.len() as u64);
305            let synchronizer = self.synchronizer.clone();
306            spawn_monitored_task!(async move {
307                // This does not wait for the fetch request to complete.
308                // It only waits for synchronizer to queue the request to a peer.
309                // When this fails, it usually means the queue is full.
310                // The fetch will retry from other peers via live and periodic syncs.
311                if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
312                    debug!("Failed to fetch missing ancestors via synchronizer: {err}");
313                }
314            });
315        }
316
317        // Schedule fetching missing soft links from this peer in the background.
318        let missing_excluded_ancestors = self
319            .core_dispatcher
320            .check_block_refs(excluded_ancestors)
321            .await
322            .map_err(|_| ConsensusError::Shutdown)?;
323        if !missing_excluded_ancestors.is_empty() {
324            self.context
325                .metrics
326                .node_metrics
327                .network_excluded_ancestors_sent_to_fetch
328                .with_label_values(&[peer_hostname])
329                .inc_by(missing_excluded_ancestors.len() as u64);
330
331            let synchronizer = self.synchronizer.clone();
332            spawn_monitored_task!(async move {
333                if let Err(err) = synchronizer
334                    .fetch_blocks(missing_excluded_ancestors, peer)
335                    .await
336                {
337                    debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
338                }
339            });
340        }
341
342        Ok(())
343    }
344
345    async fn handle_subscribe_blocks(
346        &self,
347        peer: AuthorityIndex,
348        last_received: Round,
349    ) -> ConsensusResult<BlockStream> {
350        fail_point_async!("consensus-rpc-response");
351
352        // Find past proposed blocks as the initial blocks to send to the peer.
353        //
354        // If there are cached blocks in the range which the peer requested, send all of them.
355        // The size is bounded by the local GC round and DagState cache size.
356        //
357        // Otherwise if there is no cached block in the range which the peer requested,
358        // and this node has proposed blocks before, at least one block should be sent to the peer
359        // to help with liveness.
360        let past_proposed_blocks = {
361            let dag_state = self.dag_state.read();
362
363            let mut proposed_blocks =
364                dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
365            if proposed_blocks.is_empty() {
366                let last_proposed_block = dag_state
367                    .get_last_proposed_block()
368                    .expect("Last proposed block should be returned on validators");
369                proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
370                    vec![last_proposed_block]
371                } else {
372                    vec![]
373                };
374            }
375            stream::iter(
376                proposed_blocks
377                    .into_iter()
378                    .map(|block| ExtendedSerializedBlock {
379                        block: block.serialized().clone(),
380                        excluded_ancestors: vec![],
381                    }),
382            )
383        };
384
385        let broadcasted_blocks = BroadcastedBlockStream::new(
386            PeerId::Validator(peer),
387            self.rx_block_broadcast.resubscribe(),
388            self.subscription_counter.clone(),
389        );
390
391        // Return a stream of blocks that first yields missed blocks as requested, then new blocks.
392        Ok(Box::pin(past_proposed_blocks.chain(
393            broadcasted_blocks.map(ExtendedSerializedBlock::from),
394        )))
395    }
396
397    // Handles 3 types of requests:
398    // 1. Live sync:
399    //    - Both missing block refs and highest accepted rounds are specified.
400    //    - fetch_missing_ancestors is true.
401    //    - response returns max_blocks_per_sync blocks.
402    // 2. Periodic sync:
403    //    - Highest accepted rounds must be specified.
404    //    - Missing block refs are optional.
405    //    - fetch_missing_ancestors is false (default).
406    //    - response returns max_blocks_per_fetch blocks.
407    // 3. Commit sync:
408    //    - Missing block refs are specified.
409    //    - Highest accepted rounds are empty.
410    //    - fetch_missing_ancestors is false (default).
411    //    - response returns max_blocks_per_fetch blocks.
412    async fn handle_fetch_blocks(
413        &self,
414        _peer: AuthorityIndex,
415        mut block_refs: Vec<BlockRef>,
416        fetch_after_rounds: Vec<Round>,
417        fetch_missing_ancestors: bool,
418    ) -> ConsensusResult<Vec<Bytes>> {
419        fail_point_async!("consensus-rpc-response");
420
421        if block_refs.is_empty() && (fetch_missing_ancestors || fetch_after_rounds.is_empty()) {
422            return Err(ConsensusError::InvalidFetchBlocksRequest("When no block refs are provided, fetch_after_rounds must be provided and fetch_missing_ancestors must be false".to_string()));
423        }
424        if !fetch_after_rounds.is_empty()
425            && fetch_after_rounds.len() != self.context.committee.size()
426        {
427            return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
428                fetch_after_rounds.len(),
429                self.context.committee.size(),
430            ));
431        }
432
433        // Finds the suitable limit of # of blocks to return.
434        let max_response_num_blocks = if !fetch_after_rounds.is_empty() && !block_refs.is_empty() {
435            self.context.parameters.max_blocks_per_sync
436        } else {
437            self.context.parameters.max_blocks_per_fetch
438        };
439        if block_refs.len() > max_response_num_blocks {
440            block_refs.truncate(max_response_num_blocks);
441        }
442
443        // Validate the requested block refs.
444        for block in &block_refs {
445            if !self.context.committee.is_valid_index(block.author) {
446                return Err(ConsensusError::InvalidAuthorityIndex {
447                    index: block.author,
448                    max: self.context.committee.size(),
449                });
450            }
451            if block.round == GENESIS_ROUND {
452                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
453            }
454        }
455
456        // Get the requested blocks first.
457        let mut blocks = self
458            .dag_state
459            .read()
460            .get_blocks(&block_refs)
461            .into_iter()
462            .flatten()
463            .collect::<Vec<_>>();
464
465        // When fetch_missing_ancestors is true, fetch missing ancestors of the requested blocks.
466        // Otherwise, fetch additional blocks depth-first from the requested block authorities.
467        if blocks.len() < max_response_num_blocks && !fetch_after_rounds.is_empty() {
468            if fetch_missing_ancestors {
469                // Get unique missing ancestor blocks of the requested blocks (validated to be non-empty).
470                // fetch_after_rounds will only be used to filter out already accepted blocks.
471                let missing_ancestors = blocks
472                    .iter()
473                    .flat_map(|block| block.ancestors().to_vec())
474                    .filter(|block_ref| fetch_after_rounds[block_ref.author] < block_ref.round)
475                    .collect::<BTreeSet<_>>()
476                    .into_iter()
477                    .collect::<Vec<_>>();
478
479                // If there are too many missing ancestors, randomly select a subset to avoid
480                // fetching duplicated blocks across peers.
481                let selected_num_blocks = max_response_num_blocks
482                    .saturating_sub(blocks.len())
483                    .min(missing_ancestors.len());
484                if selected_num_blocks > 0 {
485                    let selected_ancestor_refs = missing_ancestors
486                        .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
487                        .copied()
488                        .collect::<Vec<_>>();
489                    let ancestor_blocks = self
490                        .dag_state
491                        .read()
492                        .get_blocks(&selected_ancestor_refs)
493                        .into_iter()
494                        .flatten();
495                    blocks.extend(ancestor_blocks);
496                }
497            } else {
498                // Get additional blocks from authorities with missing block.
499                // Compute the fetch round per requested authority, or all authorities.
500                let mut limit_rounds = BTreeMap::<AuthorityIndex, Round>::new();
501                if block_refs.is_empty() {
502                    let dag_state = self.dag_state.read();
503                    for (index, _authority) in self.context.committee.authorities() {
504                        let last_block = dag_state.get_last_block_for_authority(index);
505                        limit_rounds.insert(index, last_block.round());
506                    }
507                } else {
508                    for block_ref in &block_refs {
509                        let entry = limit_rounds
510                            .entry(block_ref.author)
511                            .or_insert(block_ref.round);
512                        *entry = (*entry).min(block_ref.round);
513                    }
514                }
515
516                // Use a min-heap to fetch blocks across authorities in ascending round order.
517                // Each entry is (fetch_start_round, authority, limit_round).
518                let mut heap = BinaryHeap::new();
519                for (authority, limit_round) in &limit_rounds {
520                    let fetch_start = fetch_after_rounds[*authority] + 1;
521                    if fetch_start < *limit_round {
522                        heap.push(Reverse((fetch_start, *authority, *limit_round)));
523                    }
524                }
525
526                while let Some(Reverse((fetch_start, authority, limit_round))) = heap.pop() {
527                    let fetched = self.store.scan_blocks_by_author_in_range(
528                        authority,
529                        fetch_start,
530                        limit_round,
531                        1,
532                    )?;
533                    if let Some(block) = fetched.into_iter().next() {
534                        let next_start = block.round() + 1;
535                        blocks.push(block);
536                        if blocks.len() >= max_response_num_blocks {
537                            blocks.truncate(max_response_num_blocks);
538                            break;
539                        }
540                        if next_start < limit_round {
541                            heap.push(Reverse((next_start, authority, limit_round)));
542                        }
543                    }
544                }
545            }
546        }
547
548        // Return the serialized blocks
549        let bytes = blocks
550            .into_iter()
551            .map(|block| block.serialized().clone())
552            .collect::<Vec<_>>();
553        Ok(bytes)
554    }
555
556    async fn handle_fetch_commits(
557        &self,
558        _peer: AuthorityIndex,
559        commit_range: CommitRange,
560    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
561        fail_point_async!("consensus-rpc-response");
562
563        // Compute an inclusive end index and bound the maximum number of commits scanned.
564        let inclusive_end = commit_range.end().min(
565            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
566                - 1,
567        );
568        let mut commits = self
569            .store
570            .scan_commits((commit_range.start()..=inclusive_end).into())?;
571        let mut certifier_block_refs = vec![];
572        'commit: while let Some(c) = commits.last() {
573            let index = c.index();
574            let votes = self.store.read_commit_votes(index)?;
575            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
576            for v in &votes {
577                stake_aggregator.add(v.author, &self.context.committee);
578            }
579            if stake_aggregator.reached_threshold(&self.context.committee) {
580                certifier_block_refs = votes;
581                break 'commit;
582            } else {
583                debug!(
584                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
585                    index,
586                    stake_aggregator.stake(),
587                    stake_aggregator.threshold(&self.context.committee)
588                );
589                self.context
590                    .metrics
591                    .node_metrics
592                    .commit_sync_fetch_commits_handler_uncertified_skipped
593                    .inc();
594                commits.pop();
595            }
596        }
597        let certifier_blocks = self
598            .store
599            .read_blocks(&certifier_block_refs)?
600            .into_iter()
601            .flatten()
602            .collect();
603        Ok((commits, certifier_blocks))
604    }
605
606    async fn handle_fetch_latest_blocks(
607        &self,
608        peer: AuthorityIndex,
609        authorities: Vec<AuthorityIndex>,
610    ) -> ConsensusResult<Vec<Bytes>> {
611        fail_point_async!("consensus-rpc-response");
612
613        if authorities.len() > self.context.committee.size() {
614            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
615        }
616
617        // Ensure that those are valid authorities
618        for authority in &authorities {
619            if !self.context.committee.is_valid_index(*authority) {
620                return Err(ConsensusError::InvalidAuthorityIndex {
621                    index: *authority,
622                    max: self.context.committee.size(),
623                });
624            }
625        }
626
627        // Read from the dag state to find the latest blocks.
628        // TODO: at the moment we don't look into the block manager for suspended blocks. Ideally we
629        // want in the future if we think we would like to tackle the majority of cases.
630        let mut blocks = vec![];
631        let dag_state = self.dag_state.read();
632        for authority in authorities {
633            let block = dag_state.get_last_block_for_authority(authority);
634
635            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
636
637            // no reason to serve back the genesis block - it's equal as if it has not received any block
638            if block.round() != GENESIS_ROUND {
639                blocks.push(block);
640            }
641        }
642
643        // Return the serialised blocks
644        let result = blocks
645            .into_iter()
646            .map(|block| block.serialized().clone())
647            .collect::<Vec<_>>();
648
649        Ok(result)
650    }
651
652    async fn handle_get_latest_rounds(
653        &self,
654        _peer: AuthorityIndex,
655    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
656        fail_point_async!("consensus-rpc-response");
657
658        let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
659
660        let blocks = self
661            .dag_state
662            .read()
663            .get_last_cached_block_per_authority(Round::MAX);
664        let highest_accepted_rounds = blocks
665            .into_iter()
666            .map(|(block, _)| block.round())
667            .collect::<Vec<_>>();
668
669        Ok((highest_received_rounds, highest_accepted_rounds))
670    }
671}
672
673#[async_trait]
674impl<C: CoreThreadDispatcher> ObserverNetworkService for AuthorityService<C> {
675    async fn handle_block(
676        &self,
677        _peer: PeerId,
678        _item: ObserverBlockStreamItem,
679    ) -> ConsensusResult<()> {
680        // TODO: implement observer block handling, similar to validator block handling.
681        Err(ConsensusError::NetworkRequest(
682            "Observer block handling not yet implemented".to_string(),
683        ))
684    }
685
686    async fn handle_stream_blocks(
687        &self,
688        _peer: NodeId,
689        _highest_round_per_authority: Vec<u64>,
690    ) -> ConsensusResult<ObserverBlockStream> {
691        // TODO: Implement observer block streaming
692        todo!("Observer block streaming not yet implemented")
693    }
694
695    async fn handle_fetch_blocks(
696        &self,
697        _peer: NodeId,
698        _block_refs: Vec<BlockRef>,
699    ) -> ConsensusResult<Vec<Bytes>> {
700        // TODO: implement observer fetch blocks, similar to validator fetch_blocks but
701        // without highest_accepted_rounds.
702        Err(ConsensusError::NetworkRequest(
703            "Observer fetch blocks not yet implemented".to_string(),
704        ))
705    }
706
707    async fn handle_fetch_commits(
708        &self,
709        _peer: NodeId,
710        _commit_range: CommitRange,
711    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
712        // TODO: implement observer fetch commits, similar to validator fetch_commits.
713        Err(ConsensusError::NetworkRequest(
714            "Observer fetch commits not yet implemented".to_string(),
715        ))
716    }
717}
718struct Counter {
719    count: usize,
720    subscriptions_by_peer: BTreeMap<PeerId, usize>,
721}
722
723/// Atomically counts the number of active subscriptions to the block broadcast stream.
724pub(crate) struct SubscriptionCounter {
725    context: Arc<Context>,
726    counter: parking_lot::Mutex<Counter>,
727}
728
729impl SubscriptionCounter {
730    pub(crate) fn new(context: Arc<Context>) -> Self {
731        // Set the subscribed peers by default to 0
732        for (_, authority) in context.committee.authorities() {
733            context
734                .metrics
735                .node_metrics
736                .subscribed_by
737                .with_label_values(&[authority.hostname.as_str()])
738                .set(0);
739        }
740
741        Self {
742            counter: parking_lot::Mutex::new(Counter {
743                count: 0,
744                subscriptions_by_peer: BTreeMap::new(),
745            }),
746            context,
747        }
748    }
749
750    fn increment(&self, peer: &PeerId) -> Result<(), ConsensusError> {
751        let mut counter = self.counter.lock();
752        counter.count += 1;
753        *counter
754            .subscriptions_by_peer
755            .entry(peer.clone())
756            .or_insert(0) += 1;
757
758        match peer {
759            PeerId::Validator(authority) => {
760                let peer_hostname = &self.context.committee.authority(*authority).hostname;
761                self.context
762                    .metrics
763                    .node_metrics
764                    .subscribed_by
765                    .with_label_values(&[peer_hostname])
766                    .set(1);
767            }
768            PeerId::Observer(_) => {
769                self.context
770                    .metrics
771                    .node_metrics
772                    .subscribed_by
773                    .with_label_values(&["observer"])
774                    .inc();
775            }
776        }
777
778        Ok(())
779    }
780
781    fn decrement(&self, peer: &PeerId) -> Result<(), ConsensusError> {
782        let mut counter = self.counter.lock();
783        counter.count -= 1;
784        *counter
785            .subscriptions_by_peer
786            .entry(peer.clone())
787            .or_insert(0) -= 1;
788
789        if counter.subscriptions_by_peer[peer] == 0 {
790            match peer {
791                PeerId::Validator(authority) => {
792                    let peer_hostname = &self.context.committee.authority(*authority).hostname;
793                    self.context
794                        .metrics
795                        .node_metrics
796                        .subscribed_by
797                        .with_label_values(&[peer_hostname])
798                        .set(0);
799                }
800                PeerId::Observer(_) => {
801                    self.context
802                        .metrics
803                        .node_metrics
804                        .subscribed_by
805                        .with_label_values(&["observer"])
806                        .dec();
807                }
808            }
809        }
810
811        Ok(())
812    }
813}
814
815/// Each broadcasted block stream wraps a broadcast receiver for blocks.
816/// It yields blocks that are broadcasted after the stream is created.
817type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
818
819/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
820/// this tolerates lags with only logging, without yielding errors.
821pub(crate) struct BroadcastStream<T> {
822    peer: PeerId,
823    // Stores the receiver across poll_next() calls.
824    inner: ReusableBoxFuture<
825        'static,
826        (
827            Result<T, broadcast::error::RecvError>,
828            broadcast::Receiver<T>,
829        ),
830    >,
831    // Counts total subscriptions / active BroadcastStreams.
832    subscription_counter: Arc<SubscriptionCounter>,
833}
834
835impl<T: 'static + Clone + Send> BroadcastStream<T> {
836    pub fn new(
837        peer: PeerId,
838        rx: broadcast::Receiver<T>,
839        subscription_counter: Arc<SubscriptionCounter>,
840    ) -> Self {
841        if let Err(err) = subscription_counter.increment(&peer) {
842            match err {
843                ConsensusError::Shutdown => {}
844                _ => panic!("Unexpected error: {err}"),
845            }
846        }
847        Self {
848            peer,
849            inner: ReusableBoxFuture::new(make_recv_future(rx)),
850            subscription_counter,
851        }
852    }
853}
854
855impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
856    type Item = T;
857
858    fn poll_next(
859        mut self: Pin<&mut Self>,
860        cx: &mut task::Context<'_>,
861    ) -> task::Poll<Option<Self::Item>> {
862        let peer = self.peer.clone();
863        let maybe_item = loop {
864            let (result, rx) = ready!(self.inner.poll(cx));
865            self.inner.set(make_recv_future(rx));
866
867            match result {
868                Ok(item) => break Some(item),
869                Err(broadcast::error::RecvError::Closed) => {
870                    info!("Block BroadcastedBlockStream {} closed", peer);
871                    break None;
872                }
873                Err(broadcast::error::RecvError::Lagged(n)) => {
874                    warn!(
875                        "Block BroadcastedBlockStream {} lagged by {} messages",
876                        peer, n
877                    );
878                    continue;
879                }
880            }
881        };
882        task::Poll::Ready(maybe_item)
883    }
884}
885
886impl<T> Drop for BroadcastStream<T> {
887    fn drop(&mut self) {
888        if let Err(err) = self.subscription_counter.decrement(&self.peer) {
889            match err {
890                ConsensusError::Shutdown => {}
891                _ => panic!("Unexpected error: {err}"),
892            }
893        }
894    }
895}
896
897async fn make_recv_future<T: Clone>(
898    mut rx: broadcast::Receiver<T>,
899) -> (
900    Result<T, broadcast::error::RecvError>,
901    broadcast::Receiver<T>,
902) {
903    let result = rx.recv().await;
904    (result, rx)
905}
906
907// TODO: add a unit test for BroadcastStream.
908
909#[cfg(test)]
910mod tests {
911    use std::{
912        collections::{BTreeMap, BTreeSet},
913        sync::Arc,
914        time::Duration,
915    };
916
917    use async_trait::async_trait;
918    use bytes::Bytes;
919    use consensus_config::AuthorityIndex;
920    use consensus_types::block::{BlockDigest, BlockRef, Round};
921    use parking_lot::{Mutex, RwLock};
922    use tokio::{sync::broadcast, time::sleep};
923
924    use futures::StreamExt as _;
925
926    use crate::{
927        authority_service::AuthorityService,
928        block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
929        commit::{CertifiedCommits, CommitRange},
930        commit_vote_monitor::CommitVoteMonitor,
931        context::Context,
932        core_thread::{CoreError, CoreThreadDispatcher},
933        dag_state::DagState,
934        error::ConsensusResult,
935        network::{
936            BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
937            ValidatorNetworkClient, ValidatorNetworkService,
938        },
939        round_tracker::RoundTracker,
940        storage::mem_store::MemStore,
941        synchronizer::Synchronizer,
942        test_dag_builder::DagBuilder,
943        transaction_vote_tracker::TransactionVoteTracker,
944    };
945    struct FakeCoreThreadDispatcher {
946        blocks: Mutex<Vec<VerifiedBlock>>,
947    }
948
949    impl FakeCoreThreadDispatcher {
950        fn new() -> Self {
951            Self {
952                blocks: Mutex::new(vec![]),
953            }
954        }
955
956        fn get_blocks(&self) -> Vec<VerifiedBlock> {
957            self.blocks.lock().clone()
958        }
959    }
960
961    #[async_trait]
962    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
963        async fn add_blocks(
964            &self,
965            blocks: Vec<VerifiedBlock>,
966        ) -> Result<BTreeSet<BlockRef>, CoreError> {
967            let block_refs = blocks.iter().map(|b| b.reference()).collect();
968            self.blocks.lock().extend(blocks);
969            Ok(block_refs)
970        }
971
972        async fn check_block_refs(
973            &self,
974            _block_refs: Vec<BlockRef>,
975        ) -> Result<BTreeSet<BlockRef>, CoreError> {
976            Ok(BTreeSet::new())
977        }
978
979        async fn add_certified_commits(
980            &self,
981            _commits: CertifiedCommits,
982        ) -> Result<BTreeSet<BlockRef>, CoreError> {
983            todo!()
984        }
985
986        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
987            Ok(())
988        }
989
990        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
991            Ok(Default::default())
992        }
993
994        fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
995            todo!()
996        }
997
998        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
999            todo!()
1000        }
1001    }
1002
1003    #[derive(Default)]
1004    struct FakeNetworkClient {}
1005
1006    #[async_trait]
1007    impl ValidatorNetworkClient for FakeNetworkClient {
1008        async fn send_block(
1009            &self,
1010            _peer: AuthorityIndex,
1011            _block: &VerifiedBlock,
1012            _timeout: Duration,
1013        ) -> ConsensusResult<()> {
1014            unimplemented!("Unimplemented")
1015        }
1016
1017        async fn subscribe_blocks(
1018            &self,
1019            _peer: AuthorityIndex,
1020            _last_received: Round,
1021            _timeout: Duration,
1022        ) -> ConsensusResult<BlockStream> {
1023            unimplemented!("Unimplemented")
1024        }
1025
1026        async fn fetch_blocks(
1027            &self,
1028            _peer: AuthorityIndex,
1029            _block_refs: Vec<BlockRef>,
1030            _fetch_after_rounds: Vec<Round>,
1031            _fetch_missing_ancestors: bool,
1032            _timeout: Duration,
1033        ) -> ConsensusResult<Vec<Bytes>> {
1034            unimplemented!("Unimplemented")
1035        }
1036
1037        async fn fetch_commits(
1038            &self,
1039            _peer: AuthorityIndex,
1040            _commit_range: CommitRange,
1041            _timeout: Duration,
1042        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1043            unimplemented!("Unimplemented")
1044        }
1045
1046        async fn fetch_latest_blocks(
1047            &self,
1048            _peer: AuthorityIndex,
1049            _authorities: Vec<AuthorityIndex>,
1050            _timeout: Duration,
1051        ) -> ConsensusResult<Vec<Bytes>> {
1052            unimplemented!("Unimplemented")
1053        }
1054
1055        async fn get_latest_rounds(
1056            &self,
1057            _peer: AuthorityIndex,
1058            _timeout: Duration,
1059        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1060            unimplemented!("Unimplemented")
1061        }
1062    }
1063
1064    #[async_trait]
1065    impl ObserverNetworkClient for FakeNetworkClient {
1066        async fn stream_blocks(
1067            &self,
1068            _peer: crate::network::PeerId,
1069            _highest_round_per_authority: Vec<u64>,
1070            _timeout: Duration,
1071        ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1072            unimplemented!("Unimplemented")
1073        }
1074
1075        async fn fetch_blocks(
1076            &self,
1077            _peer: crate::network::PeerId,
1078            _block_refs: Vec<BlockRef>,
1079            _timeout: Duration,
1080        ) -> ConsensusResult<Vec<Bytes>> {
1081            unimplemented!("Unimplemented")
1082        }
1083
1084        async fn fetch_commits(
1085            &self,
1086            _peer: crate::network::PeerId,
1087            _commit_range: CommitRange,
1088            _timeout: Duration,
1089        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1090            unimplemented!("Unimplemented")
1091        }
1092    }
1093
1094    #[tokio::test(flavor = "current_thread", start_paused = true)]
1095    async fn test_handle_send_block() {
1096        let (context, _keys) = Context::new_for_test(4);
1097        let context = Arc::new(context);
1098        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1099        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1100        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1101        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1102        let fake_client = Arc::new(FakeNetworkClient::default());
1103        let network_client = Arc::new(SynchronizerClient::new(
1104            context.clone(),
1105            Some(fake_client.clone()),
1106            Some(fake_client.clone()),
1107        ));
1108        let store = Arc::new(MemStore::new());
1109        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1110        let transaction_vote_tracker =
1111            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1112        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1113        let synchronizer = Synchronizer::start(
1114            network_client,
1115            context.clone(),
1116            core_dispatcher.clone(),
1117            commit_vote_monitor.clone(),
1118            block_verifier.clone(),
1119            transaction_vote_tracker.clone(),
1120            round_tracker.clone(),
1121            dag_state.clone(),
1122            false,
1123        );
1124        let authority_service = Arc::new(AuthorityService::new(
1125            context.clone(),
1126            block_verifier,
1127            commit_vote_monitor,
1128            round_tracker,
1129            synchronizer,
1130            core_dispatcher.clone(),
1131            rx_block_broadcast,
1132            transaction_vote_tracker,
1133            dag_state,
1134            store,
1135        ));
1136
1137        // Test delaying blocks with time drift.
1138        let now = context.clock.timestamp_utc_ms();
1139        let max_drift = context.parameters.max_forward_time_drift;
1140        let input_block = VerifiedBlock::new_for_test(
1141            TestBlock::new(9, 0)
1142                .set_timestamp_ms(now + max_drift.as_millis() as u64)
1143                .build(),
1144        );
1145
1146        let service = authority_service.clone();
1147        let serialized = ExtendedSerializedBlock {
1148            block: input_block.serialized().clone(),
1149            excluded_ancestors: vec![],
1150        };
1151
1152        tokio::spawn({
1153            let service = service.clone();
1154            let context = context.clone();
1155            async move {
1156                service
1157                    .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1158                    .await
1159                    .unwrap();
1160            }
1161        });
1162
1163        sleep(max_drift / 2).await;
1164
1165        let blocks = core_dispatcher.get_blocks();
1166        assert_eq!(blocks.len(), 1);
1167        assert_eq!(blocks[0], input_block);
1168
1169        // Test invalid block.
1170        let invalid_block =
1171            VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1172        let extended_block = ExtendedSerializedBlock {
1173            block: invalid_block.serialized().clone(),
1174            excluded_ancestors: vec![],
1175        };
1176        service
1177            .handle_send_block(
1178                context.committee.to_authority_index(0).unwrap(),
1179                extended_block,
1180            )
1181            .await
1182            .unwrap_err();
1183
1184        // Test invalid excluded ancestors.
1185        let invalid_excluded_ancestors = vec![
1186            bcs::to_bytes(&BlockRef::new(
1187                10,
1188                AuthorityIndex::new_for_test(1000),
1189                BlockDigest::MIN,
1190            ))
1191            .unwrap(),
1192            vec![3u8; 40],
1193            bcs::to_bytes(&invalid_block.reference()).unwrap(),
1194        ];
1195        let extended_block = ExtendedSerializedBlock {
1196            block: input_block.serialized().clone(),
1197            excluded_ancestors: invalid_excluded_ancestors,
1198        };
1199        service
1200            .handle_send_block(
1201                context.committee.to_authority_index(0).unwrap(),
1202                extended_block,
1203            )
1204            .await
1205            .unwrap_err();
1206    }
1207
1208    #[tokio::test(flavor = "current_thread", start_paused = true)]
1209    async fn test_handle_fetch_blocks() {
1210        // GIVEN
1211        // Use NUM_AUTHORITIES and NUM_ROUNDS higher than max_blocks_per_sync to test limits.
1212        const NUM_AUTHORITIES: usize = 40;
1213        const NUM_ROUNDS: usize = 40;
1214        let (mut context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1215        context.parameters.max_blocks_per_fetch = 50;
1216        let context = Arc::new(context);
1217        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1218        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1219        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1220        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1221        let fake_client = Arc::new(FakeNetworkClient::default());
1222        let network_client = Arc::new(SynchronizerClient::new(
1223            context.clone(),
1224            Some(fake_client.clone()),
1225            Some(fake_client.clone()),
1226        ));
1227        let store = Arc::new(MemStore::new());
1228        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1229        let transaction_vote_tracker =
1230            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1231        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1232        let synchronizer = Synchronizer::start(
1233            network_client,
1234            context.clone(),
1235            core_dispatcher.clone(),
1236            commit_vote_monitor.clone(),
1237            block_verifier.clone(),
1238            transaction_vote_tracker.clone(),
1239            round_tracker.clone(),
1240            dag_state.clone(),
1241            false,
1242        );
1243        let authority_service = Arc::new(AuthorityService::new(
1244            context.clone(),
1245            block_verifier,
1246            commit_vote_monitor,
1247            round_tracker,
1248            synchronizer,
1249            core_dispatcher.clone(),
1250            rx_block_broadcast,
1251            transaction_vote_tracker,
1252            dag_state.clone(),
1253            store,
1254        ));
1255
1256        // GIVEN: 40 rounds of blocks in the dag state.
1257        let mut dag_builder = DagBuilder::new(context.clone());
1258        dag_builder
1259            .layers(1..=(NUM_ROUNDS as u32))
1260            .build()
1261            .persist_layers(dag_state.clone());
1262        dag_state.write().flush();
1263        let all_blocks = dag_builder.all_blocks();
1264
1265        // WHEN: Request 2 blocks from round 40, fetch missing ancestors enabled.
1266        let missing_block_refs: Vec<BlockRef> = all_blocks
1267            .iter()
1268            .rev()
1269            .take(2)
1270            .map(|b| b.reference())
1271            .collect();
1272        let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1273        let results = authority_service
1274            .handle_fetch_blocks(
1275                AuthorityIndex::new_for_test(0),
1276                missing_block_refs.clone(),
1277                highest_accepted_rounds,
1278                true,
1279            )
1280            .await
1281            .unwrap();
1282
1283        // THEN: the expected number of unique blocks are returned.
1284        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1285            .iter()
1286            .map(|b| {
1287                let signed = bcs::from_bytes(b).unwrap();
1288                let block = VerifiedBlock::new_verified(signed, b.clone());
1289                (block.reference(), block)
1290            })
1291            .collect();
1292        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1293        // All missing blocks are returned.
1294        for b in &missing_block_refs {
1295            assert!(blocks.contains_key(b));
1296        }
1297        let num_missing_ancestors = blocks
1298            .keys()
1299            .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1300            .count();
1301        assert_eq!(
1302            num_missing_ancestors,
1303            context.parameters.max_blocks_per_sync - missing_block_refs.len()
1304        );
1305
1306        // WHEN: Request 2 blocks from round 37, fetch missing ancestors disabled.
1307        let missing_round = NUM_ROUNDS as Round - 3;
1308        let missing_block_refs: Vec<BlockRef> = all_blocks
1309            .iter()
1310            .filter(|b| b.reference().round == missing_round)
1311            .map(|b| b.reference())
1312            .take(2)
1313            .collect();
1314        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1315        // Try to fill up the blocks from the 1st authority in missing_block_refs.
1316        highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1317        let results = authority_service
1318            .handle_fetch_blocks(
1319                AuthorityIndex::new_for_test(0),
1320                missing_block_refs.clone(),
1321                highest_accepted_rounds,
1322                false,
1323            )
1324            .await
1325            .unwrap();
1326
1327        // THEN: the expected number of unique blocks are returned.
1328        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1329            .iter()
1330            .map(|b| {
1331                let signed = bcs::from_bytes(b).unwrap();
1332                let block = VerifiedBlock::new_verified(signed, b.clone());
1333                (block.reference(), block)
1334            })
1335            .collect();
1336        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1337        // All missing blocks are returned.
1338        for b in &missing_block_refs {
1339            assert!(blocks.contains_key(b));
1340        }
1341        // Ancestor blocks are from the expected rounds and authorities.
1342        let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1343        for b in blocks.keys() {
1344            assert!(b.round <= missing_round);
1345            assert!(expected_authors.contains(&b.author));
1346        }
1347
1348        // WHEN: Request with empty block_refs, fetch missing ancestors disabled.
1349        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1350        // Set a few authorities to higher accepted rounds.
1351        highest_accepted_rounds[0] = (NUM_ROUNDS as Round) - 5;
1352        highest_accepted_rounds[1] = (NUM_ROUNDS as Round) - 3;
1353        let results = authority_service
1354            .handle_fetch_blocks(
1355                AuthorityIndex::new_for_test(0),
1356                vec![],
1357                highest_accepted_rounds.clone(),
1358                false,
1359            )
1360            .await
1361            .unwrap();
1362
1363        // THEN: the expected number of unique blocks are returned.
1364        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1365            .iter()
1366            .map(|b| {
1367                let signed = bcs::from_bytes(b).unwrap();
1368                let block = VerifiedBlock::new_verified(signed, b.clone());
1369                (block.reference(), block)
1370            })
1371            .collect();
1372        assert_eq!(blocks.len(), context.parameters.max_blocks_per_fetch);
1373        // Blocks should be from all authorities, within the expected round range.
1374        for block_ref in blocks.keys() {
1375            let accepted = highest_accepted_rounds[block_ref.author];
1376            assert!(block_ref.round > accepted);
1377        }
1378        // Blocks should be fetched in ascending round order across authorities,
1379        // so blocks should have low rounds near the accepted rounds.
1380        let max_round_in_result = blocks.keys().map(|b| b.round).max().unwrap();
1381        // With 40 authorities mostly at accepted round 1 and max_blocks_per_fetch=50,
1382        // the min-heap fills ~1-2 rounds per authority.
1383        assert!(
1384            max_round_in_result <= 4,
1385            "Expected low rounds from fair round-order fetching, got max round {}",
1386            max_round_in_result
1387        );
1388
1389        // WHEN: Request 5 block from round 40, not getting ancestors.
1390        let missing_block_refs: Vec<BlockRef> = all_blocks
1391            .iter()
1392            .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1393            .map(|b| b.reference())
1394            .take(5)
1395            .collect();
1396        let results = authority_service
1397            .handle_fetch_blocks(
1398                AuthorityIndex::new_for_test(0),
1399                missing_block_refs.clone(),
1400                vec![],
1401                false,
1402            )
1403            .await
1404            .unwrap();
1405
1406        // THEN: the expected number of unique blocks are returned.
1407        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1408            .iter()
1409            .map(|b| {
1410                let signed = bcs::from_bytes(b).unwrap();
1411                let block = VerifiedBlock::new_verified(signed, b.clone());
1412                (block.reference(), block)
1413            })
1414            .collect();
1415        assert_eq!(blocks.len(), 5);
1416        for b in &missing_block_refs {
1417            assert!(blocks.contains_key(b));
1418        }
1419    }
1420
1421    #[tokio::test(flavor = "current_thread", start_paused = true)]
1422    async fn test_handle_fetch_latest_blocks() {
1423        // GIVEN
1424        let (context, _keys) = Context::new_for_test(4);
1425        let context = Arc::new(context);
1426        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1427        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1428        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1429        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1430        let fake_client = Arc::new(FakeNetworkClient::default());
1431        let network_client = Arc::new(SynchronizerClient::new(
1432            context.clone(),
1433            Some(fake_client.clone()),
1434            Some(fake_client.clone()),
1435        ));
1436        let store = Arc::new(MemStore::new());
1437        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1438        let transaction_vote_tracker =
1439            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1440        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1441        let synchronizer = Synchronizer::start(
1442            network_client,
1443            context.clone(),
1444            core_dispatcher.clone(),
1445            commit_vote_monitor.clone(),
1446            block_verifier.clone(),
1447            transaction_vote_tracker.clone(),
1448            round_tracker.clone(),
1449            dag_state.clone(),
1450            true,
1451        );
1452        let authority_service = Arc::new(AuthorityService::new(
1453            context.clone(),
1454            block_verifier,
1455            commit_vote_monitor,
1456            round_tracker,
1457            synchronizer,
1458            core_dispatcher.clone(),
1459            rx_block_broadcast,
1460            transaction_vote_tracker,
1461            dag_state.clone(),
1462            store,
1463        ));
1464
1465        // Create some blocks for a few authorities. Create some equivocations as well and store in dag state.
1466        let mut dag_builder = DagBuilder::new(context.clone());
1467        dag_builder
1468            .layers(1..=10)
1469            .authorities(vec![AuthorityIndex::new_for_test(2)])
1470            .equivocate(1)
1471            .build()
1472            .persist_layers(dag_state);
1473
1474        // WHEN
1475        let authorities_to_request = vec![
1476            AuthorityIndex::new_for_test(1),
1477            AuthorityIndex::new_for_test(2),
1478        ];
1479        let results = authority_service
1480            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1481            .await;
1482
1483        // THEN
1484        let serialised_blocks = results.unwrap();
1485        for serialised_block in serialised_blocks {
1486            let signed_block: SignedBlock =
1487                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1488            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1489
1490            assert_eq!(verified_block.round(), 10);
1491        }
1492    }
1493
1494    #[tokio::test(flavor = "current_thread", start_paused = true)]
1495    async fn test_handle_subscribe_blocks() {
1496        let (context, _keys) = Context::new_for_test(4);
1497        let context = Arc::new(context);
1498        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1499        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1500        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1501        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1502        let fake_client = Arc::new(FakeNetworkClient::default());
1503        let network_client = Arc::new(SynchronizerClient::new(
1504            context.clone(),
1505            Some(fake_client.clone()),
1506            Some(fake_client.clone()),
1507        ));
1508        let store = Arc::new(MemStore::new());
1509        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1510        let transaction_vote_tracker =
1511            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1512        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1513        let synchronizer = Synchronizer::start(
1514            network_client,
1515            context.clone(),
1516            core_dispatcher.clone(),
1517            commit_vote_monitor.clone(),
1518            block_verifier.clone(),
1519            transaction_vote_tracker.clone(),
1520            round_tracker.clone(),
1521            dag_state.clone(),
1522            false,
1523        );
1524
1525        // Create 3 proposed blocks at rounds 5, 10, 15 for own authority (index 0)
1526        dag_state
1527            .write()
1528            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1529        dag_state
1530            .write()
1531            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1532        dag_state
1533            .write()
1534            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1535
1536        let authority_service = Arc::new(AuthorityService::new(
1537            context.clone(),
1538            block_verifier,
1539            commit_vote_monitor,
1540            round_tracker,
1541            synchronizer,
1542            core_dispatcher.clone(),
1543            rx_block_broadcast,
1544            transaction_vote_tracker,
1545            dag_state.clone(),
1546            store,
1547        ));
1548
1549        let peer = context.committee.to_authority_index(1).unwrap();
1550
1551        // Case A: Subscribe with last_received = 100 (after all proposed blocks)
1552        // Should return last proposed block (round 15) as fallback
1553        {
1554            let mut stream = authority_service
1555                .handle_subscribe_blocks(peer, 100)
1556                .await
1557                .unwrap();
1558            let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1559            assert_eq!(
1560                block.round(),
1561                15,
1562                "Should return last proposed block as fallback"
1563            );
1564            assert_eq!(block.author().value(), 0);
1565        }
1566
1567        // Case B: Subscribe with last_received = 7 (includes rounds 10, 15)
1568        // Should return cached blocks from round 8+
1569        {
1570            let mut stream = authority_service
1571                .handle_subscribe_blocks(peer, 7)
1572                .await
1573                .unwrap();
1574
1575            let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1576            assert_eq!(block1.round(), 10, "Should return block at round 10");
1577
1578            let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1579            assert_eq!(block2.round(), 15, "Should return block at round 15");
1580        }
1581    }
1582
1583    #[tokio::test(flavor = "current_thread", start_paused = true)]
1584    async fn test_handle_subscribe_blocks_not_proposed() {
1585        let (context, _keys) = Context::new_for_test(4);
1586        let context = Arc::new(context);
1587        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1588        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1589        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1590        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1591        let fake_client = Arc::new(FakeNetworkClient::default());
1592        let network_client = Arc::new(SynchronizerClient::new(
1593            context.clone(),
1594            Some(fake_client.clone()),
1595            Some(fake_client.clone()),
1596        ));
1597        let store = Arc::new(MemStore::new());
1598        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1599        let transaction_vote_tracker =
1600            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1601        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1602        let synchronizer = Synchronizer::start(
1603            network_client,
1604            context.clone(),
1605            core_dispatcher.clone(),
1606            commit_vote_monitor.clone(),
1607            block_verifier.clone(),
1608            transaction_vote_tracker.clone(),
1609            round_tracker.clone(),
1610            dag_state.clone(),
1611            false,
1612        );
1613
1614        // No blocks added to DagState - only genesis exists
1615
1616        let authority_service = Arc::new(AuthorityService::new(
1617            context.clone(),
1618            block_verifier,
1619            commit_vote_monitor,
1620            round_tracker,
1621            synchronizer,
1622            core_dispatcher.clone(),
1623            rx_block_broadcast,
1624            transaction_vote_tracker,
1625            dag_state.clone(),
1626            store,
1627        ));
1628
1629        let peer = context.committee.to_authority_index(1).unwrap();
1630
1631        // Subscribe - no blocks have been proposed yet (only genesis exists)
1632        let mut stream = authority_service
1633            .handle_subscribe_blocks(peer, 0)
1634            .await
1635            .unwrap();
1636
1637        // Should NOT receive any block (genesis must not be returned)
1638        use futures::poll;
1639        use std::task::Poll;
1640        let poll_result = poll!(stream.next());
1641        assert!(
1642            matches!(poll_result, Poll::Pending),
1643            "Should not receive genesis block on subscription stream"
1644        );
1645    }
1646}