consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::Reverse,
6    collections::{BTreeMap, BTreeSet, BinaryHeap},
7    pin::Pin,
8    sync::Arc,
9    time::Duration,
10};
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockRef, Round};
16use futures::{Stream, StreamExt, ready, stream, task};
17use mysten_metrics::spawn_monitored_task;
18use parking_lot::RwLock;
19use rand::seq::SliceRandom as _;
20use sui_macros::fail_point_async;
21use tap::TapFallible;
22use tokio::sync::broadcast;
23use tokio_util::sync::ReusableBoxFuture;
24use tracing::{debug, info, warn};
25
26use crate::{
27    CommitIndex,
28    block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
29    block_verifier::BlockVerifier,
30    commit::{CommitAPI as _, CommitRange, TrustedCommit},
31    commit_vote_monitor::CommitVoteMonitor,
32    context::Context,
33    core_thread::CoreThreadDispatcher,
34    dag_state::DagState,
35    error::{ConsensusError, ConsensusResult},
36    network::{
37        BlockStream, ExtendedSerializedBlock, NodeId, ObserverBlockStream, ObserverBlockStreamItem,
38        ObserverNetworkService, PeerId, ValidatorNetworkService,
39    },
40    round_tracker::RoundTracker,
41    stake_aggregator::{QuorumThreshold, StakeAggregator},
42    storage::Store,
43    synchronizer::SynchronizerHandle,
44    transaction_vote_tracker::TransactionVoteTracker,
45};
46
47pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
48
49/// Authority's network service implementation, agnostic to the actual networking stack used.
50pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
51    context: Arc<Context>,
52    commit_vote_monitor: Arc<CommitVoteMonitor>,
53    block_verifier: Arc<dyn BlockVerifier>,
54    synchronizer: Arc<SynchronizerHandle>,
55    core_dispatcher: Arc<C>,
56    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
57    subscription_counter: Arc<SubscriptionCounter>,
58    transaction_vote_tracker: TransactionVoteTracker,
59    dag_state: Arc<RwLock<DagState>>,
60    store: Arc<dyn Store>,
61    round_tracker: Arc<RwLock<RoundTracker>>,
62}
63
64impl<C: CoreThreadDispatcher> AuthorityService<C> {
65    pub(crate) fn new(
66        context: Arc<Context>,
67        block_verifier: Arc<dyn BlockVerifier>,
68        commit_vote_monitor: Arc<CommitVoteMonitor>,
69        round_tracker: Arc<RwLock<RoundTracker>>,
70        synchronizer: Arc<SynchronizerHandle>,
71        core_dispatcher: Arc<C>,
72        rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
73        transaction_vote_tracker: TransactionVoteTracker,
74        dag_state: Arc<RwLock<DagState>>,
75        store: Arc<dyn Store>,
76    ) -> Self {
77        let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
78        Self {
79            context,
80            block_verifier,
81            commit_vote_monitor,
82            synchronizer,
83            core_dispatcher,
84            rx_block_broadcast,
85            subscription_counter,
86            transaction_vote_tracker,
87            dag_state,
88            store,
89            round_tracker,
90        }
91    }
92
93    // Parses and validates serialized excluded ancestors.
94    fn parse_excluded_ancestors(
95        &self,
96        peer: AuthorityIndex,
97        block: &VerifiedBlock,
98        mut excluded_ancestors: Vec<Vec<u8>>,
99    ) -> ConsensusResult<Vec<BlockRef>> {
100        let peer_hostname = &self.context.committee.authority(peer).hostname;
101
102        let excluded_ancestors_limit = self.context.committee.size() * 2;
103        if excluded_ancestors.len() > excluded_ancestors_limit {
104            debug!(
105                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
106                excluded_ancestors.len() - excluded_ancestors_limit,
107                peer,
108                peer_hostname,
109            );
110            excluded_ancestors.truncate(excluded_ancestors_limit);
111        }
112
113        let excluded_ancestors = excluded_ancestors
114            .into_iter()
115            .map(|serialized| {
116                let block_ref: BlockRef =
117                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
118                if !self.context.committee.is_valid_index(block_ref.author) {
119                    return Err(ConsensusError::InvalidAuthorityIndex {
120                        index: block_ref.author,
121                        max: self.context.committee.size(),
122                    });
123                }
124                if block_ref.round >= block.round() {
125                    return Err(ConsensusError::InvalidAncestorRound {
126                        ancestor: block_ref.round,
127                        block: block.round(),
128                    });
129                }
130                Ok(block_ref)
131            })
132            .collect::<ConsensusResult<Vec<BlockRef>>>()?;
133
134        for excluded_ancestor in &excluded_ancestors {
135            let excluded_ancestor_hostname = &self
136                .context
137                .committee
138                .authority(excluded_ancestor.author)
139                .hostname;
140            self.context
141                .metrics
142                .node_metrics
143                .network_excluded_ancestors_count_by_authority
144                .with_label_values(&[excluded_ancestor_hostname])
145                .inc();
146        }
147        self.context
148            .metrics
149            .node_metrics
150            .network_received_excluded_ancestors_from_authority
151            .with_label_values(&[peer_hostname])
152            .inc_by(excluded_ancestors.len() as u64);
153
154        Ok(excluded_ancestors)
155    }
156}
157
158#[async_trait]
159impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
160    async fn handle_send_block(
161        &self,
162        peer: AuthorityIndex,
163        serialized_block: ExtendedSerializedBlock,
164    ) -> ConsensusResult<()> {
165        fail_point_async!("consensus-rpc-response");
166
167        let peer_hostname = &self.context.committee.authority(peer).hostname;
168
169        // TODO: dedup block verifications, here and with fetched blocks.
170        let signed_block: SignedBlock =
171            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
172
173        // Reject blocks not produced by the peer.
174        if peer != signed_block.author() {
175            self.context
176                .metrics
177                .node_metrics
178                .invalid_blocks
179                .with_label_values(&[
180                    peer_hostname.as_str(),
181                    "handle_send_block",
182                    "UnexpectedAuthority",
183                ])
184                .inc();
185            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
186            info!("Block with wrong authority from {}: {}", peer, e);
187            return Err(e);
188        }
189
190        // Reject blocks failing parsing and validations.
191        let (verified_block, reject_txn_votes) = self
192            .block_verifier
193            .verify_and_vote(signed_block, serialized_block.block)
194            .tap_err(|e| {
195                self.context
196                    .metrics
197                    .node_metrics
198                    .invalid_blocks
199                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
200                    .inc();
201                info!("Invalid block from {}: {}", peer, e);
202            })?;
203        let excluded_ancestors = self
204            .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
205            .tap_err(|e| {
206                debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
207                self.context
208                    .metrics
209                    .node_metrics
210                    .invalid_blocks
211                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
212                    .inc();
213            })?;
214
215        let block_ref = verified_block.reference();
216        debug!("Received block {} via send block.", block_ref);
217
218        self.context
219            .metrics
220            .node_metrics
221            .verified_blocks
222            .with_label_values(&[peer_hostname])
223            .inc();
224
225        let now = self.context.clock.timestamp_utc_ms();
226        let forward_time_drift =
227            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
228
229        self.context
230            .metrics
231            .node_metrics
232            .block_timestamp_drift_ms
233            .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
234            .inc_by(forward_time_drift.as_millis() as u64);
235
236        // Observe the block for the commit votes. When local commit is lagging too much,
237        // commit sync loop will trigger fetching.
238        self.commit_vote_monitor.observe_block(&verified_block);
239
240        // Update own received rounds and peer accepted rounds from this verified block.
241        self.round_tracker
242            .write()
243            .update_from_verified_block(&ExtendedBlock {
244                block: verified_block.clone(),
245                excluded_ancestors: excluded_ancestors.clone(),
246            });
247
248        // Reject blocks when local commit index is lagging too far from quorum commit index,
249        // to avoid the memory overhead from suspended blocks.
250        //
251        // IMPORTANT: this must be done after observing votes from the block, otherwise
252        // observed quorum commit will no longer progress.
253        //
254        // Since the main issue with too many suspended blocks is memory usage not CPU,
255        // it is ok to reject after block verifications instead of before.
256        let last_commit_index = self.dag_state.read().last_commit_index();
257        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
258        // The threshold to ignore block should be larger than commit_sync_batch_size,
259        // to avoid excessive block rejections and synchronizations.
260        if last_commit_index
261            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
262            < quorum_commit_index
263        {
264            self.context
265                .metrics
266                .node_metrics
267                .rejected_blocks
268                .with_label_values(&["commit_lagging"])
269                .inc();
270            debug!(
271                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
272                block_ref, last_commit_index, quorum_commit_index,
273            );
274            return Err(ConsensusError::BlockRejected {
275                block_ref,
276                reason: format!(
277                    "Last commit index is lagging quorum commit index too much ({} < {})",
278                    last_commit_index, quorum_commit_index,
279                ),
280            });
281        }
282
283        // The block is verified and current, so record own votes on the block
284        // before sending the block to Core.
285        if self.context.protocol_config.transaction_voting_enabled() {
286            self.transaction_vote_tracker
287                .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
288        }
289
290        // Send the block to Core to try accepting it into the DAG.
291        let missing_ancestors = self
292            .core_dispatcher
293            .add_blocks(vec![verified_block.clone()])
294            .await
295            .map_err(|_| ConsensusError::Shutdown)?;
296
297        // Schedule fetching missing ancestors from this peer in the background.
298        if !missing_ancestors.is_empty() {
299            self.context
300                .metrics
301                .node_metrics
302                .handler_received_block_missing_ancestors
303                .with_label_values(&[peer_hostname])
304                .inc_by(missing_ancestors.len() as u64);
305            let synchronizer = self.synchronizer.clone();
306            spawn_monitored_task!(async move {
307                // This does not wait for the fetch request to complete.
308                // It only waits for synchronizer to queue the request to a peer.
309                // When this fails, it usually means the queue is full.
310                // The fetch will retry from other peers via live and periodic syncs.
311                if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
312                    debug!("Failed to fetch missing ancestors via synchronizer: {err}");
313                }
314            });
315        }
316
317        // Schedule fetching missing soft links from this peer in the background.
318        let missing_excluded_ancestors = self
319            .core_dispatcher
320            .check_block_refs(excluded_ancestors)
321            .await
322            .map_err(|_| ConsensusError::Shutdown)?;
323        if !missing_excluded_ancestors.is_empty() {
324            self.context
325                .metrics
326                .node_metrics
327                .network_excluded_ancestors_sent_to_fetch
328                .with_label_values(&[peer_hostname])
329                .inc_by(missing_excluded_ancestors.len() as u64);
330
331            let synchronizer = self.synchronizer.clone();
332            spawn_monitored_task!(async move {
333                if let Err(err) = synchronizer
334                    .fetch_blocks(missing_excluded_ancestors, peer)
335                    .await
336                {
337                    debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
338                }
339            });
340        }
341
342        Ok(())
343    }
344
345    async fn handle_subscribe_blocks(
346        &self,
347        peer: AuthorityIndex,
348        last_received: Round,
349    ) -> ConsensusResult<BlockStream> {
350        fail_point_async!("consensus-rpc-response");
351
352        // Find past proposed blocks as the initial blocks to send to the peer.
353        //
354        // If there are cached blocks in the range which the peer requested, send all of them.
355        // The size is bounded by the local GC round and DagState cache size.
356        //
357        // Otherwise if there is no cached block in the range which the peer requested,
358        // and this node has proposed blocks before, at least one block should be sent to the peer
359        // to help with liveness.
360        let past_proposed_blocks = {
361            let dag_state = self.dag_state.read();
362
363            let mut proposed_blocks =
364                dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
365            if proposed_blocks.is_empty() {
366                let last_proposed_block = dag_state.get_last_proposed_block();
367                proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
368                    vec![last_proposed_block]
369                } else {
370                    vec![]
371                };
372            }
373            stream::iter(
374                proposed_blocks
375                    .into_iter()
376                    .map(|block| ExtendedSerializedBlock {
377                        block: block.serialized().clone(),
378                        excluded_ancestors: vec![],
379                    }),
380            )
381        };
382
383        let broadcasted_blocks = BroadcastedBlockStream::new(
384            PeerId::Validator(peer),
385            self.rx_block_broadcast.resubscribe(),
386            self.subscription_counter.clone(),
387        );
388
389        // Return a stream of blocks that first yields missed blocks as requested, then new blocks.
390        Ok(Box::pin(past_proposed_blocks.chain(
391            broadcasted_blocks.map(ExtendedSerializedBlock::from),
392        )))
393    }
394
395    // Handles 3 types of requests:
396    // 1. Live sync:
397    //    - Both missing block refs and highest accepted rounds are specified.
398    //    - fetch_missing_ancestors is true.
399    //    - response returns max_blocks_per_sync blocks.
400    // 2. Periodic sync:
401    //    - Highest accepted rounds must be specified.
402    //    - Missing block refs are optional.
403    //    - fetch_missing_ancestors is false (default).
404    //    - response returns max_blocks_per_fetch blocks.
405    // 3. Commit sync:
406    //    - Missing block refs are specified.
407    //    - Highest accepted rounds are empty.
408    //    - fetch_missing_ancestors is false (default).
409    //    - response returns max_blocks_per_fetch blocks.
410    async fn handle_fetch_blocks(
411        &self,
412        _peer: AuthorityIndex,
413        mut block_refs: Vec<BlockRef>,
414        fetch_after_rounds: Vec<Round>,
415        fetch_missing_ancestors: bool,
416    ) -> ConsensusResult<Vec<Bytes>> {
417        fail_point_async!("consensus-rpc-response");
418
419        if block_refs.is_empty() && (fetch_missing_ancestors || fetch_after_rounds.is_empty()) {
420            return Err(ConsensusError::InvalidFetchBlocksRequest("When no block refs are provided, fetch_after_rounds must be provided and fetch_missing_ancestors must be false".to_string()));
421        }
422        if !fetch_after_rounds.is_empty()
423            && fetch_after_rounds.len() != self.context.committee.size()
424        {
425            return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
426                fetch_after_rounds.len(),
427                self.context.committee.size(),
428            ));
429        }
430
431        // Finds the suitable limit of # of blocks to return.
432        let max_response_num_blocks = if !fetch_after_rounds.is_empty() && !block_refs.is_empty() {
433            self.context.parameters.max_blocks_per_sync
434        } else {
435            self.context.parameters.max_blocks_per_fetch
436        };
437        if block_refs.len() > max_response_num_blocks {
438            block_refs.truncate(max_response_num_blocks);
439        }
440
441        // Validate the requested block refs.
442        for block in &block_refs {
443            if !self.context.committee.is_valid_index(block.author) {
444                return Err(ConsensusError::InvalidAuthorityIndex {
445                    index: block.author,
446                    max: self.context.committee.size(),
447                });
448            }
449            if block.round == GENESIS_ROUND {
450                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
451            }
452        }
453
454        // Get the requested blocks first.
455        let mut blocks = self
456            .dag_state
457            .read()
458            .get_blocks(&block_refs)
459            .into_iter()
460            .flatten()
461            .collect::<Vec<_>>();
462
463        // When fetch_missing_ancestors is true, fetch missing ancestors of the requested blocks.
464        // Otherwise, fetch additional blocks depth-first from the requested block authorities.
465        if blocks.len() < max_response_num_blocks && !fetch_after_rounds.is_empty() {
466            if fetch_missing_ancestors {
467                // Get unique missing ancestor blocks of the requested blocks (validated to be non-empty).
468                // fetch_after_rounds will only be used to filter out already accepted blocks.
469                let missing_ancestors = blocks
470                    .iter()
471                    .flat_map(|block| block.ancestors().to_vec())
472                    .filter(|block_ref| fetch_after_rounds[block_ref.author] < block_ref.round)
473                    .collect::<BTreeSet<_>>()
474                    .into_iter()
475                    .collect::<Vec<_>>();
476
477                // If there are too many missing ancestors, randomly select a subset to avoid
478                // fetching duplicated blocks across peers.
479                let selected_num_blocks = max_response_num_blocks
480                    .saturating_sub(blocks.len())
481                    .min(missing_ancestors.len());
482                if selected_num_blocks > 0 {
483                    let selected_ancestor_refs = missing_ancestors
484                        .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
485                        .copied()
486                        .collect::<Vec<_>>();
487                    let ancestor_blocks = self
488                        .dag_state
489                        .read()
490                        .get_blocks(&selected_ancestor_refs)
491                        .into_iter()
492                        .flatten();
493                    blocks.extend(ancestor_blocks);
494                }
495            } else {
496                // Get additional blocks from authorities with missing block.
497                // Compute the fetch round per requested authority, or all authorities.
498                let mut limit_rounds = BTreeMap::<AuthorityIndex, Round>::new();
499                if block_refs.is_empty() {
500                    let dag_state = self.dag_state.read();
501                    for (index, _authority) in self.context.committee.authorities() {
502                        let last_block = dag_state.get_last_block_for_authority(index);
503                        limit_rounds.insert(index, last_block.round());
504                    }
505                } else {
506                    for block_ref in &block_refs {
507                        let entry = limit_rounds
508                            .entry(block_ref.author)
509                            .or_insert(block_ref.round);
510                        *entry = (*entry).min(block_ref.round);
511                    }
512                }
513
514                // Use a min-heap to fetch blocks across authorities in ascending round order.
515                // Each entry is (fetch_start_round, authority, limit_round).
516                let mut heap = BinaryHeap::new();
517                for (authority, limit_round) in &limit_rounds {
518                    let fetch_start = fetch_after_rounds[*authority] + 1;
519                    if fetch_start < *limit_round {
520                        heap.push(Reverse((fetch_start, *authority, *limit_round)));
521                    }
522                }
523
524                while let Some(Reverse((fetch_start, authority, limit_round))) = heap.pop() {
525                    let fetched = self.store.scan_blocks_by_author_in_range(
526                        authority,
527                        fetch_start,
528                        limit_round,
529                        1,
530                    )?;
531                    if let Some(block) = fetched.into_iter().next() {
532                        let next_start = block.round() + 1;
533                        blocks.push(block);
534                        if blocks.len() >= max_response_num_blocks {
535                            blocks.truncate(max_response_num_blocks);
536                            break;
537                        }
538                        if next_start < limit_round {
539                            heap.push(Reverse((next_start, authority, limit_round)));
540                        }
541                    }
542                }
543            }
544        }
545
546        // Return the serialized blocks
547        let bytes = blocks
548            .into_iter()
549            .map(|block| block.serialized().clone())
550            .collect::<Vec<_>>();
551        Ok(bytes)
552    }
553
554    async fn handle_fetch_commits(
555        &self,
556        _peer: AuthorityIndex,
557        commit_range: CommitRange,
558    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
559        fail_point_async!("consensus-rpc-response");
560
561        // Compute an inclusive end index and bound the maximum number of commits scanned.
562        let inclusive_end = commit_range.end().min(
563            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
564                - 1,
565        );
566        let mut commits = self
567            .store
568            .scan_commits((commit_range.start()..=inclusive_end).into())?;
569        let mut certifier_block_refs = vec![];
570        'commit: while let Some(c) = commits.last() {
571            let index = c.index();
572            let votes = self.store.read_commit_votes(index)?;
573            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
574            for v in &votes {
575                stake_aggregator.add(v.author, &self.context.committee);
576            }
577            if stake_aggregator.reached_threshold(&self.context.committee) {
578                certifier_block_refs = votes;
579                break 'commit;
580            } else {
581                debug!(
582                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
583                    index,
584                    stake_aggregator.stake(),
585                    stake_aggregator.threshold(&self.context.committee)
586                );
587                self.context
588                    .metrics
589                    .node_metrics
590                    .commit_sync_fetch_commits_handler_uncertified_skipped
591                    .inc();
592                commits.pop();
593            }
594        }
595        let certifier_blocks = self
596            .store
597            .read_blocks(&certifier_block_refs)?
598            .into_iter()
599            .flatten()
600            .collect();
601        Ok((commits, certifier_blocks))
602    }
603
604    async fn handle_fetch_latest_blocks(
605        &self,
606        peer: AuthorityIndex,
607        authorities: Vec<AuthorityIndex>,
608    ) -> ConsensusResult<Vec<Bytes>> {
609        fail_point_async!("consensus-rpc-response");
610
611        if authorities.len() > self.context.committee.size() {
612            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
613        }
614
615        // Ensure that those are valid authorities
616        for authority in &authorities {
617            if !self.context.committee.is_valid_index(*authority) {
618                return Err(ConsensusError::InvalidAuthorityIndex {
619                    index: *authority,
620                    max: self.context.committee.size(),
621                });
622            }
623        }
624
625        // Read from the dag state to find the latest blocks.
626        // TODO: at the moment we don't look into the block manager for suspended blocks. Ideally we
627        // want in the future if we think we would like to tackle the majority of cases.
628        let mut blocks = vec![];
629        let dag_state = self.dag_state.read();
630        for authority in authorities {
631            let block = dag_state.get_last_block_for_authority(authority);
632
633            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
634
635            // no reason to serve back the genesis block - it's equal as if it has not received any block
636            if block.round() != GENESIS_ROUND {
637                blocks.push(block);
638            }
639        }
640
641        // Return the serialised blocks
642        let result = blocks
643            .into_iter()
644            .map(|block| block.serialized().clone())
645            .collect::<Vec<_>>();
646
647        Ok(result)
648    }
649
650    async fn handle_get_latest_rounds(
651        &self,
652        _peer: AuthorityIndex,
653    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
654        fail_point_async!("consensus-rpc-response");
655
656        let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
657
658        let blocks = self
659            .dag_state
660            .read()
661            .get_last_cached_block_per_authority(Round::MAX);
662        let highest_accepted_rounds = blocks
663            .into_iter()
664            .map(|(block, _)| block.round())
665            .collect::<Vec<_>>();
666
667        Ok((highest_received_rounds, highest_accepted_rounds))
668    }
669}
670
671#[async_trait]
672impl<C: CoreThreadDispatcher> ObserverNetworkService for AuthorityService<C> {
673    async fn handle_block(
674        &self,
675        _peer: PeerId,
676        _item: ObserverBlockStreamItem,
677    ) -> ConsensusResult<()> {
678        // TODO: implement observer block handling, similar to validator block handling.
679        Err(ConsensusError::NetworkRequest(
680            "Observer block handling not yet implemented".to_string(),
681        ))
682    }
683
684    async fn handle_stream_blocks(
685        &self,
686        _peer: NodeId,
687        _highest_round_per_authority: Vec<u64>,
688    ) -> ConsensusResult<ObserverBlockStream> {
689        // TODO: Implement observer block streaming
690        todo!("Observer block streaming not yet implemented")
691    }
692
693    async fn handle_fetch_blocks(
694        &self,
695        _peer: NodeId,
696        _block_refs: Vec<BlockRef>,
697    ) -> ConsensusResult<Vec<Bytes>> {
698        // TODO: implement observer fetch blocks, similar to validator fetch_blocks but
699        // without highest_accepted_rounds.
700        Err(ConsensusError::NetworkRequest(
701            "Observer fetch blocks not yet implemented".to_string(),
702        ))
703    }
704
705    async fn handle_fetch_commits(
706        &self,
707        _peer: NodeId,
708        _commit_range: CommitRange,
709    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
710        // TODO: implement observer fetch commits, similar to validator fetch_commits.
711        Err(ConsensusError::NetworkRequest(
712            "Observer fetch commits not yet implemented".to_string(),
713        ))
714    }
715}
716struct Counter {
717    count: usize,
718    subscriptions_by_peer: BTreeMap<PeerId, usize>,
719}
720
721/// Atomically counts the number of active subscriptions to the block broadcast stream.
722pub(crate) struct SubscriptionCounter {
723    context: Arc<Context>,
724    counter: parking_lot::Mutex<Counter>,
725}
726
727impl SubscriptionCounter {
728    pub(crate) fn new(context: Arc<Context>) -> Self {
729        // Set the subscribed peers by default to 0
730        for (_, authority) in context.committee.authorities() {
731            context
732                .metrics
733                .node_metrics
734                .subscribed_by
735                .with_label_values(&[authority.hostname.as_str()])
736                .set(0);
737        }
738
739        Self {
740            counter: parking_lot::Mutex::new(Counter {
741                count: 0,
742                subscriptions_by_peer: BTreeMap::new(),
743            }),
744            context,
745        }
746    }
747
748    fn increment(&self, peer: &PeerId) -> Result<(), ConsensusError> {
749        let mut counter = self.counter.lock();
750        counter.count += 1;
751        *counter
752            .subscriptions_by_peer
753            .entry(peer.clone())
754            .or_insert(0) += 1;
755
756        match peer {
757            PeerId::Validator(authority) => {
758                let peer_hostname = &self.context.committee.authority(*authority).hostname;
759                self.context
760                    .metrics
761                    .node_metrics
762                    .subscribed_by
763                    .with_label_values(&[peer_hostname])
764                    .set(1);
765            }
766            PeerId::Observer(_) => {
767                self.context
768                    .metrics
769                    .node_metrics
770                    .subscribed_by
771                    .with_label_values(&["observer"])
772                    .inc();
773            }
774        }
775
776        Ok(())
777    }
778
779    fn decrement(&self, peer: &PeerId) -> Result<(), ConsensusError> {
780        let mut counter = self.counter.lock();
781        counter.count -= 1;
782        *counter
783            .subscriptions_by_peer
784            .entry(peer.clone())
785            .or_insert(0) -= 1;
786
787        if counter.subscriptions_by_peer[peer] == 0 {
788            match peer {
789                PeerId::Validator(authority) => {
790                    let peer_hostname = &self.context.committee.authority(*authority).hostname;
791                    self.context
792                        .metrics
793                        .node_metrics
794                        .subscribed_by
795                        .with_label_values(&[peer_hostname])
796                        .set(0);
797                }
798                PeerId::Observer(_) => {
799                    self.context
800                        .metrics
801                        .node_metrics
802                        .subscribed_by
803                        .with_label_values(&["observer"])
804                        .dec();
805                }
806            }
807        }
808
809        Ok(())
810    }
811}
812
813/// Each broadcasted block stream wraps a broadcast receiver for blocks.
814/// It yields blocks that are broadcasted after the stream is created.
815type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
816
817/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
818/// this tolerates lags with only logging, without yielding errors.
819pub(crate) struct BroadcastStream<T> {
820    peer: PeerId,
821    // Stores the receiver across poll_next() calls.
822    inner: ReusableBoxFuture<
823        'static,
824        (
825            Result<T, broadcast::error::RecvError>,
826            broadcast::Receiver<T>,
827        ),
828    >,
829    // Counts total subscriptions / active BroadcastStreams.
830    subscription_counter: Arc<SubscriptionCounter>,
831}
832
833impl<T: 'static + Clone + Send> BroadcastStream<T> {
834    pub fn new(
835        peer: PeerId,
836        rx: broadcast::Receiver<T>,
837        subscription_counter: Arc<SubscriptionCounter>,
838    ) -> Self {
839        if let Err(err) = subscription_counter.increment(&peer) {
840            match err {
841                ConsensusError::Shutdown => {}
842                _ => panic!("Unexpected error: {err}"),
843            }
844        }
845        Self {
846            peer,
847            inner: ReusableBoxFuture::new(make_recv_future(rx)),
848            subscription_counter,
849        }
850    }
851}
852
853impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
854    type Item = T;
855
856    fn poll_next(
857        mut self: Pin<&mut Self>,
858        cx: &mut task::Context<'_>,
859    ) -> task::Poll<Option<Self::Item>> {
860        let peer = self.peer.clone();
861        let maybe_item = loop {
862            let (result, rx) = ready!(self.inner.poll(cx));
863            self.inner.set(make_recv_future(rx));
864
865            match result {
866                Ok(item) => break Some(item),
867                Err(broadcast::error::RecvError::Closed) => {
868                    info!("Block BroadcastedBlockStream {} closed", peer);
869                    break None;
870                }
871                Err(broadcast::error::RecvError::Lagged(n)) => {
872                    warn!(
873                        "Block BroadcastedBlockStream {} lagged by {} messages",
874                        peer, n
875                    );
876                    continue;
877                }
878            }
879        };
880        task::Poll::Ready(maybe_item)
881    }
882}
883
884impl<T> Drop for BroadcastStream<T> {
885    fn drop(&mut self) {
886        if let Err(err) = self.subscription_counter.decrement(&self.peer) {
887            match err {
888                ConsensusError::Shutdown => {}
889                _ => panic!("Unexpected error: {err}"),
890            }
891        }
892    }
893}
894
895async fn make_recv_future<T: Clone>(
896    mut rx: broadcast::Receiver<T>,
897) -> (
898    Result<T, broadcast::error::RecvError>,
899    broadcast::Receiver<T>,
900) {
901    let result = rx.recv().await;
902    (result, rx)
903}
904
905// TODO: add a unit test for BroadcastStream.
906
907#[cfg(test)]
908mod tests {
909    use std::{
910        collections::{BTreeMap, BTreeSet},
911        sync::Arc,
912        time::Duration,
913    };
914
915    use async_trait::async_trait;
916    use bytes::Bytes;
917    use consensus_config::AuthorityIndex;
918    use consensus_types::block::{BlockDigest, BlockRef, Round};
919    use parking_lot::{Mutex, RwLock};
920    use tokio::{sync::broadcast, time::sleep};
921
922    use futures::StreamExt as _;
923
924    use crate::{
925        authority_service::AuthorityService,
926        block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
927        commit::{CertifiedCommits, CommitRange},
928        commit_vote_monitor::CommitVoteMonitor,
929        context::Context,
930        core_thread::{CoreError, CoreThreadDispatcher},
931        dag_state::DagState,
932        error::ConsensusResult,
933        network::{
934            BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
935            ValidatorNetworkClient, ValidatorNetworkService,
936        },
937        round_tracker::RoundTracker,
938        storage::mem_store::MemStore,
939        synchronizer::Synchronizer,
940        test_dag_builder::DagBuilder,
941        transaction_vote_tracker::TransactionVoteTracker,
942    };
943    struct FakeCoreThreadDispatcher {
944        blocks: Mutex<Vec<VerifiedBlock>>,
945    }
946
947    impl FakeCoreThreadDispatcher {
948        fn new() -> Self {
949            Self {
950                blocks: Mutex::new(vec![]),
951            }
952        }
953
954        fn get_blocks(&self) -> Vec<VerifiedBlock> {
955            self.blocks.lock().clone()
956        }
957    }
958
959    #[async_trait]
960    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
961        async fn add_blocks(
962            &self,
963            blocks: Vec<VerifiedBlock>,
964        ) -> Result<BTreeSet<BlockRef>, CoreError> {
965            let block_refs = blocks.iter().map(|b| b.reference()).collect();
966            self.blocks.lock().extend(blocks);
967            Ok(block_refs)
968        }
969
970        async fn check_block_refs(
971            &self,
972            _block_refs: Vec<BlockRef>,
973        ) -> Result<BTreeSet<BlockRef>, CoreError> {
974            Ok(BTreeSet::new())
975        }
976
977        async fn add_certified_commits(
978            &self,
979            _commits: CertifiedCommits,
980        ) -> Result<BTreeSet<BlockRef>, CoreError> {
981            todo!()
982        }
983
984        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
985            Ok(())
986        }
987
988        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
989            Ok(Default::default())
990        }
991
992        fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
993            todo!()
994        }
995
996        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
997            todo!()
998        }
999    }
1000
1001    #[derive(Default)]
1002    struct FakeNetworkClient {}
1003
1004    #[async_trait]
1005    impl ValidatorNetworkClient for FakeNetworkClient {
1006        async fn send_block(
1007            &self,
1008            _peer: AuthorityIndex,
1009            _block: &VerifiedBlock,
1010            _timeout: Duration,
1011        ) -> ConsensusResult<()> {
1012            unimplemented!("Unimplemented")
1013        }
1014
1015        async fn subscribe_blocks(
1016            &self,
1017            _peer: AuthorityIndex,
1018            _last_received: Round,
1019            _timeout: Duration,
1020        ) -> ConsensusResult<BlockStream> {
1021            unimplemented!("Unimplemented")
1022        }
1023
1024        async fn fetch_blocks(
1025            &self,
1026            _peer: AuthorityIndex,
1027            _block_refs: Vec<BlockRef>,
1028            _fetch_after_rounds: Vec<Round>,
1029            _fetch_missing_ancestors: bool,
1030            _timeout: Duration,
1031        ) -> ConsensusResult<Vec<Bytes>> {
1032            unimplemented!("Unimplemented")
1033        }
1034
1035        async fn fetch_commits(
1036            &self,
1037            _peer: AuthorityIndex,
1038            _commit_range: CommitRange,
1039            _timeout: Duration,
1040        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1041            unimplemented!("Unimplemented")
1042        }
1043
1044        async fn fetch_latest_blocks(
1045            &self,
1046            _peer: AuthorityIndex,
1047            _authorities: Vec<AuthorityIndex>,
1048            _timeout: Duration,
1049        ) -> ConsensusResult<Vec<Bytes>> {
1050            unimplemented!("Unimplemented")
1051        }
1052
1053        async fn get_latest_rounds(
1054            &self,
1055            _peer: AuthorityIndex,
1056            _timeout: Duration,
1057        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1058            unimplemented!("Unimplemented")
1059        }
1060    }
1061
1062    #[async_trait]
1063    impl ObserverNetworkClient for FakeNetworkClient {
1064        async fn stream_blocks(
1065            &self,
1066            _peer: crate::network::PeerId,
1067            _highest_round_per_authority: Vec<u64>,
1068            _timeout: Duration,
1069        ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1070            unimplemented!("Unimplemented")
1071        }
1072
1073        async fn fetch_blocks(
1074            &self,
1075            _peer: crate::network::PeerId,
1076            _block_refs: Vec<BlockRef>,
1077            _timeout: Duration,
1078        ) -> ConsensusResult<Vec<Bytes>> {
1079            unimplemented!("Unimplemented")
1080        }
1081
1082        async fn fetch_commits(
1083            &self,
1084            _peer: crate::network::PeerId,
1085            _commit_range: CommitRange,
1086            _timeout: Duration,
1087        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1088            unimplemented!("Unimplemented")
1089        }
1090    }
1091
1092    #[tokio::test(flavor = "current_thread", start_paused = true)]
1093    async fn test_handle_send_block() {
1094        let (context, _keys) = Context::new_for_test(4);
1095        let context = Arc::new(context);
1096        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1097        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1098        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1099        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1100        let fake_client = Arc::new(FakeNetworkClient::default());
1101        let network_client = Arc::new(SynchronizerClient::new(
1102            context.clone(),
1103            Some(fake_client.clone()),
1104            Some(fake_client.clone()),
1105        ));
1106        let store = Arc::new(MemStore::new());
1107        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1108        let transaction_vote_tracker =
1109            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1110        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1111        let synchronizer = Synchronizer::start(
1112            network_client,
1113            context.clone(),
1114            core_dispatcher.clone(),
1115            commit_vote_monitor.clone(),
1116            block_verifier.clone(),
1117            transaction_vote_tracker.clone(),
1118            round_tracker.clone(),
1119            dag_state.clone(),
1120            false,
1121        );
1122        let authority_service = Arc::new(AuthorityService::new(
1123            context.clone(),
1124            block_verifier,
1125            commit_vote_monitor,
1126            round_tracker,
1127            synchronizer,
1128            core_dispatcher.clone(),
1129            rx_block_broadcast,
1130            transaction_vote_tracker,
1131            dag_state,
1132            store,
1133        ));
1134
1135        // Test delaying blocks with time drift.
1136        let now = context.clock.timestamp_utc_ms();
1137        let max_drift = context.parameters.max_forward_time_drift;
1138        let input_block = VerifiedBlock::new_for_test(
1139            TestBlock::new(9, 0)
1140                .set_timestamp_ms(now + max_drift.as_millis() as u64)
1141                .build(),
1142        );
1143
1144        let service = authority_service.clone();
1145        let serialized = ExtendedSerializedBlock {
1146            block: input_block.serialized().clone(),
1147            excluded_ancestors: vec![],
1148        };
1149
1150        tokio::spawn({
1151            let service = service.clone();
1152            let context = context.clone();
1153            async move {
1154                service
1155                    .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1156                    .await
1157                    .unwrap();
1158            }
1159        });
1160
1161        sleep(max_drift / 2).await;
1162
1163        let blocks = core_dispatcher.get_blocks();
1164        assert_eq!(blocks.len(), 1);
1165        assert_eq!(blocks[0], input_block);
1166
1167        // Test invalid block.
1168        let invalid_block =
1169            VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1170        let extended_block = ExtendedSerializedBlock {
1171            block: invalid_block.serialized().clone(),
1172            excluded_ancestors: vec![],
1173        };
1174        service
1175            .handle_send_block(
1176                context.committee.to_authority_index(0).unwrap(),
1177                extended_block,
1178            )
1179            .await
1180            .unwrap_err();
1181
1182        // Test invalid excluded ancestors.
1183        let invalid_excluded_ancestors = vec![
1184            bcs::to_bytes(&BlockRef::new(
1185                10,
1186                AuthorityIndex::new_for_test(1000),
1187                BlockDigest::MIN,
1188            ))
1189            .unwrap(),
1190            vec![3u8; 40],
1191            bcs::to_bytes(&invalid_block.reference()).unwrap(),
1192        ];
1193        let extended_block = ExtendedSerializedBlock {
1194            block: input_block.serialized().clone(),
1195            excluded_ancestors: invalid_excluded_ancestors,
1196        };
1197        service
1198            .handle_send_block(
1199                context.committee.to_authority_index(0).unwrap(),
1200                extended_block,
1201            )
1202            .await
1203            .unwrap_err();
1204    }
1205
1206    #[tokio::test(flavor = "current_thread", start_paused = true)]
1207    async fn test_handle_fetch_blocks() {
1208        // GIVEN
1209        // Use NUM_AUTHORITIES and NUM_ROUNDS higher than max_blocks_per_sync to test limits.
1210        const NUM_AUTHORITIES: usize = 40;
1211        const NUM_ROUNDS: usize = 40;
1212        let (mut context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1213        context.parameters.max_blocks_per_fetch = 50;
1214        let context = Arc::new(context);
1215        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1216        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1217        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1218        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1219        let fake_client = Arc::new(FakeNetworkClient::default());
1220        let network_client = Arc::new(SynchronizerClient::new(
1221            context.clone(),
1222            Some(fake_client.clone()),
1223            Some(fake_client.clone()),
1224        ));
1225        let store = Arc::new(MemStore::new());
1226        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1227        let transaction_vote_tracker =
1228            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1229        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1230        let synchronizer = Synchronizer::start(
1231            network_client,
1232            context.clone(),
1233            core_dispatcher.clone(),
1234            commit_vote_monitor.clone(),
1235            block_verifier.clone(),
1236            transaction_vote_tracker.clone(),
1237            round_tracker.clone(),
1238            dag_state.clone(),
1239            false,
1240        );
1241        let authority_service = Arc::new(AuthorityService::new(
1242            context.clone(),
1243            block_verifier,
1244            commit_vote_monitor,
1245            round_tracker,
1246            synchronizer,
1247            core_dispatcher.clone(),
1248            rx_block_broadcast,
1249            transaction_vote_tracker,
1250            dag_state.clone(),
1251            store,
1252        ));
1253
1254        // GIVEN: 40 rounds of blocks in the dag state.
1255        let mut dag_builder = DagBuilder::new(context.clone());
1256        dag_builder
1257            .layers(1..=(NUM_ROUNDS as u32))
1258            .build()
1259            .persist_layers(dag_state.clone());
1260        dag_state.write().flush();
1261        let all_blocks = dag_builder.all_blocks();
1262
1263        // WHEN: Request 2 blocks from round 40, fetch missing ancestors enabled.
1264        let missing_block_refs: Vec<BlockRef> = all_blocks
1265            .iter()
1266            .rev()
1267            .take(2)
1268            .map(|b| b.reference())
1269            .collect();
1270        let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1271        let results = authority_service
1272            .handle_fetch_blocks(
1273                AuthorityIndex::new_for_test(0),
1274                missing_block_refs.clone(),
1275                highest_accepted_rounds,
1276                true,
1277            )
1278            .await
1279            .unwrap();
1280
1281        // THEN: the expected number of unique blocks are returned.
1282        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1283            .iter()
1284            .map(|b| {
1285                let signed = bcs::from_bytes(b).unwrap();
1286                let block = VerifiedBlock::new_verified(signed, b.clone());
1287                (block.reference(), block)
1288            })
1289            .collect();
1290        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1291        // All missing blocks are returned.
1292        for b in &missing_block_refs {
1293            assert!(blocks.contains_key(b));
1294        }
1295        let num_missing_ancestors = blocks
1296            .keys()
1297            .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1298            .count();
1299        assert_eq!(
1300            num_missing_ancestors,
1301            context.parameters.max_blocks_per_sync - missing_block_refs.len()
1302        );
1303
1304        // WHEN: Request 2 blocks from round 37, fetch missing ancestors disabled.
1305        let missing_round = NUM_ROUNDS as Round - 3;
1306        let missing_block_refs: Vec<BlockRef> = all_blocks
1307            .iter()
1308            .filter(|b| b.reference().round == missing_round)
1309            .map(|b| b.reference())
1310            .take(2)
1311            .collect();
1312        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1313        // Try to fill up the blocks from the 1st authority in missing_block_refs.
1314        highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1315        let results = authority_service
1316            .handle_fetch_blocks(
1317                AuthorityIndex::new_for_test(0),
1318                missing_block_refs.clone(),
1319                highest_accepted_rounds,
1320                false,
1321            )
1322            .await
1323            .unwrap();
1324
1325        // THEN: the expected number of unique blocks are returned.
1326        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1327            .iter()
1328            .map(|b| {
1329                let signed = bcs::from_bytes(b).unwrap();
1330                let block = VerifiedBlock::new_verified(signed, b.clone());
1331                (block.reference(), block)
1332            })
1333            .collect();
1334        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1335        // All missing blocks are returned.
1336        for b in &missing_block_refs {
1337            assert!(blocks.contains_key(b));
1338        }
1339        // Ancestor blocks are from the expected rounds and authorities.
1340        let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1341        for b in blocks.keys() {
1342            assert!(b.round <= missing_round);
1343            assert!(expected_authors.contains(&b.author));
1344        }
1345
1346        // WHEN: Request with empty block_refs, fetch missing ancestors disabled.
1347        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1348        // Set a few authorities to higher accepted rounds.
1349        highest_accepted_rounds[0] = (NUM_ROUNDS as Round) - 5;
1350        highest_accepted_rounds[1] = (NUM_ROUNDS as Round) - 3;
1351        let results = authority_service
1352            .handle_fetch_blocks(
1353                AuthorityIndex::new_for_test(0),
1354                vec![],
1355                highest_accepted_rounds.clone(),
1356                false,
1357            )
1358            .await
1359            .unwrap();
1360
1361        // THEN: the expected number of unique blocks are returned.
1362        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1363            .iter()
1364            .map(|b| {
1365                let signed = bcs::from_bytes(b).unwrap();
1366                let block = VerifiedBlock::new_verified(signed, b.clone());
1367                (block.reference(), block)
1368            })
1369            .collect();
1370        assert_eq!(blocks.len(), context.parameters.max_blocks_per_fetch);
1371        // Blocks should be from all authorities, within the expected round range.
1372        for block_ref in blocks.keys() {
1373            let accepted = highest_accepted_rounds[block_ref.author];
1374            assert!(block_ref.round > accepted);
1375        }
1376        // Blocks should be fetched in ascending round order across authorities,
1377        // so blocks should have low rounds near the accepted rounds.
1378        let max_round_in_result = blocks.keys().map(|b| b.round).max().unwrap();
1379        // With 40 authorities mostly at accepted round 1 and max_blocks_per_fetch=50,
1380        // the min-heap fills ~1-2 rounds per authority.
1381        assert!(
1382            max_round_in_result <= 4,
1383            "Expected low rounds from fair round-order fetching, got max round {}",
1384            max_round_in_result
1385        );
1386
1387        // WHEN: Request 5 block from round 40, not getting ancestors.
1388        let missing_block_refs: Vec<BlockRef> = all_blocks
1389            .iter()
1390            .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1391            .map(|b| b.reference())
1392            .take(5)
1393            .collect();
1394        let results = authority_service
1395            .handle_fetch_blocks(
1396                AuthorityIndex::new_for_test(0),
1397                missing_block_refs.clone(),
1398                vec![],
1399                false,
1400            )
1401            .await
1402            .unwrap();
1403
1404        // THEN: the expected number of unique blocks are returned.
1405        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1406            .iter()
1407            .map(|b| {
1408                let signed = bcs::from_bytes(b).unwrap();
1409                let block = VerifiedBlock::new_verified(signed, b.clone());
1410                (block.reference(), block)
1411            })
1412            .collect();
1413        assert_eq!(blocks.len(), 5);
1414        for b in &missing_block_refs {
1415            assert!(blocks.contains_key(b));
1416        }
1417    }
1418
1419    #[tokio::test(flavor = "current_thread", start_paused = true)]
1420    async fn test_handle_fetch_latest_blocks() {
1421        // GIVEN
1422        let (context, _keys) = Context::new_for_test(4);
1423        let context = Arc::new(context);
1424        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1425        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1426        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1427        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1428        let fake_client = Arc::new(FakeNetworkClient::default());
1429        let network_client = Arc::new(SynchronizerClient::new(
1430            context.clone(),
1431            Some(fake_client.clone()),
1432            Some(fake_client.clone()),
1433        ));
1434        let store = Arc::new(MemStore::new());
1435        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1436        let transaction_vote_tracker =
1437            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1438        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1439        let synchronizer = Synchronizer::start(
1440            network_client,
1441            context.clone(),
1442            core_dispatcher.clone(),
1443            commit_vote_monitor.clone(),
1444            block_verifier.clone(),
1445            transaction_vote_tracker.clone(),
1446            round_tracker.clone(),
1447            dag_state.clone(),
1448            true,
1449        );
1450        let authority_service = Arc::new(AuthorityService::new(
1451            context.clone(),
1452            block_verifier,
1453            commit_vote_monitor,
1454            round_tracker,
1455            synchronizer,
1456            core_dispatcher.clone(),
1457            rx_block_broadcast,
1458            transaction_vote_tracker,
1459            dag_state.clone(),
1460            store,
1461        ));
1462
1463        // Create some blocks for a few authorities. Create some equivocations as well and store in dag state.
1464        let mut dag_builder = DagBuilder::new(context.clone());
1465        dag_builder
1466            .layers(1..=10)
1467            .authorities(vec![AuthorityIndex::new_for_test(2)])
1468            .equivocate(1)
1469            .build()
1470            .persist_layers(dag_state);
1471
1472        // WHEN
1473        let authorities_to_request = vec![
1474            AuthorityIndex::new_for_test(1),
1475            AuthorityIndex::new_for_test(2),
1476        ];
1477        let results = authority_service
1478            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1479            .await;
1480
1481        // THEN
1482        let serialised_blocks = results.unwrap();
1483        for serialised_block in serialised_blocks {
1484            let signed_block: SignedBlock =
1485                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1486            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1487
1488            assert_eq!(verified_block.round(), 10);
1489        }
1490    }
1491
1492    #[tokio::test(flavor = "current_thread", start_paused = true)]
1493    async fn test_handle_subscribe_blocks() {
1494        let (context, _keys) = Context::new_for_test(4);
1495        let context = Arc::new(context);
1496        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1497        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1498        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1499        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1500        let fake_client = Arc::new(FakeNetworkClient::default());
1501        let network_client = Arc::new(SynchronizerClient::new(
1502            context.clone(),
1503            Some(fake_client.clone()),
1504            Some(fake_client.clone()),
1505        ));
1506        let store = Arc::new(MemStore::new());
1507        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1508        let transaction_vote_tracker =
1509            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1510        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1511        let synchronizer = Synchronizer::start(
1512            network_client,
1513            context.clone(),
1514            core_dispatcher.clone(),
1515            commit_vote_monitor.clone(),
1516            block_verifier.clone(),
1517            transaction_vote_tracker.clone(),
1518            round_tracker.clone(),
1519            dag_state.clone(),
1520            false,
1521        );
1522
1523        // Create 3 proposed blocks at rounds 5, 10, 15 for own authority (index 0)
1524        dag_state
1525            .write()
1526            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1527        dag_state
1528            .write()
1529            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1530        dag_state
1531            .write()
1532            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1533
1534        let authority_service = Arc::new(AuthorityService::new(
1535            context.clone(),
1536            block_verifier,
1537            commit_vote_monitor,
1538            round_tracker,
1539            synchronizer,
1540            core_dispatcher.clone(),
1541            rx_block_broadcast,
1542            transaction_vote_tracker,
1543            dag_state.clone(),
1544            store,
1545        ));
1546
1547        let peer = context.committee.to_authority_index(1).unwrap();
1548
1549        // Case A: Subscribe with last_received = 100 (after all proposed blocks)
1550        // Should return last proposed block (round 15) as fallback
1551        {
1552            let mut stream = authority_service
1553                .handle_subscribe_blocks(peer, 100)
1554                .await
1555                .unwrap();
1556            let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1557            assert_eq!(
1558                block.round(),
1559                15,
1560                "Should return last proposed block as fallback"
1561            );
1562            assert_eq!(block.author().value(), 0);
1563        }
1564
1565        // Case B: Subscribe with last_received = 7 (includes rounds 10, 15)
1566        // Should return cached blocks from round 8+
1567        {
1568            let mut stream = authority_service
1569                .handle_subscribe_blocks(peer, 7)
1570                .await
1571                .unwrap();
1572
1573            let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1574            assert_eq!(block1.round(), 10, "Should return block at round 10");
1575
1576            let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1577            assert_eq!(block2.round(), 15, "Should return block at round 15");
1578        }
1579    }
1580
1581    #[tokio::test(flavor = "current_thread", start_paused = true)]
1582    async fn test_handle_subscribe_blocks_not_proposed() {
1583        let (context, _keys) = Context::new_for_test(4);
1584        let context = Arc::new(context);
1585        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1586        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1587        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1588        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1589        let fake_client = Arc::new(FakeNetworkClient::default());
1590        let network_client = Arc::new(SynchronizerClient::new(
1591            context.clone(),
1592            Some(fake_client.clone()),
1593            Some(fake_client.clone()),
1594        ));
1595        let store = Arc::new(MemStore::new());
1596        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1597        let transaction_vote_tracker =
1598            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1599        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1600        let synchronizer = Synchronizer::start(
1601            network_client,
1602            context.clone(),
1603            core_dispatcher.clone(),
1604            commit_vote_monitor.clone(),
1605            block_verifier.clone(),
1606            transaction_vote_tracker.clone(),
1607            round_tracker.clone(),
1608            dag_state.clone(),
1609            false,
1610        );
1611
1612        // No blocks added to DagState - only genesis exists
1613
1614        let authority_service = Arc::new(AuthorityService::new(
1615            context.clone(),
1616            block_verifier,
1617            commit_vote_monitor,
1618            round_tracker,
1619            synchronizer,
1620            core_dispatcher.clone(),
1621            rx_block_broadcast,
1622            transaction_vote_tracker,
1623            dag_state.clone(),
1624            store,
1625        ));
1626
1627        let peer = context.committee.to_authority_index(1).unwrap();
1628
1629        // Subscribe - no blocks have been proposed yet (only genesis exists)
1630        let mut stream = authority_service
1631            .handle_subscribe_blocks(peer, 0)
1632            .await
1633            .unwrap();
1634
1635        // Should NOT receive any block (genesis must not be returned)
1636        use futures::poll;
1637        use std::task::Poll;
1638        let poll_result = poll!(stream.next());
1639        assert!(
1640            matches!(poll_result, Poll::Pending),
1641            "Should not receive genesis block on subscription stream"
1642        );
1643    }
1644}