consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::Reverse,
6    collections::{BTreeMap, BTreeSet, BinaryHeap},
7    pin::Pin,
8    sync::Arc,
9    time::Duration,
10};
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockRef, Round};
16use futures::{Stream, StreamExt, ready, stream, task};
17use mysten_metrics::spawn_monitored_task;
18use parking_lot::RwLock;
19use rand::seq::SliceRandom as _;
20use sui_macros::fail_point_async;
21use tap::TapFallible;
22use tokio::sync::broadcast;
23use tokio_util::sync::ReusableBoxFuture;
24use tracing::{debug, info, warn};
25
26use crate::{
27    CommitIndex,
28    block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
29    block_verifier::BlockVerifier,
30    commit::{CommitAPI as _, CommitRange, TrustedCommit},
31    commit_vote_monitor::CommitVoteMonitor,
32    context::Context,
33    core_thread::CoreThreadDispatcher,
34    dag_state::DagState,
35    error::{ConsensusError, ConsensusResult},
36    network::{
37        BlockStream, ExtendedSerializedBlock, NodeId, ObserverBlockStream, ObserverBlockStreamItem,
38        ObserverNetworkService, PeerId, ValidatorNetworkService,
39    },
40    round_tracker::RoundTracker,
41    stake_aggregator::{QuorumThreshold, StakeAggregator},
42    storage::Store,
43    synchronizer::SynchronizerHandle,
44    transaction_vote_tracker::TransactionVoteTracker,
45};
46
47pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
48
49/// Authority's network service implementation, agnostic to the actual networking stack used.
50pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
51    context: Arc<Context>,
52    commit_vote_monitor: Arc<CommitVoteMonitor>,
53    block_verifier: Arc<dyn BlockVerifier>,
54    synchronizer: Arc<SynchronizerHandle>,
55    core_dispatcher: Arc<C>,
56    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
57    subscription_counter: Arc<SubscriptionCounter>,
58    transaction_vote_tracker: TransactionVoteTracker,
59    dag_state: Arc<RwLock<DagState>>,
60    store: Arc<dyn Store>,
61    round_tracker: Arc<RwLock<RoundTracker>>,
62}
63
64impl<C: CoreThreadDispatcher> AuthorityService<C> {
65    pub(crate) fn new(
66        context: Arc<Context>,
67        block_verifier: Arc<dyn BlockVerifier>,
68        commit_vote_monitor: Arc<CommitVoteMonitor>,
69        round_tracker: Arc<RwLock<RoundTracker>>,
70        synchronizer: Arc<SynchronizerHandle>,
71        core_dispatcher: Arc<C>,
72        rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
73        transaction_vote_tracker: TransactionVoteTracker,
74        dag_state: Arc<RwLock<DagState>>,
75        store: Arc<dyn Store>,
76    ) -> Self {
77        let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
78        Self {
79            context,
80            block_verifier,
81            commit_vote_monitor,
82            synchronizer,
83            core_dispatcher,
84            rx_block_broadcast,
85            subscription_counter,
86            transaction_vote_tracker,
87            dag_state,
88            store,
89            round_tracker,
90        }
91    }
92
93    // Parses and validates serialized excluded ancestors.
94    fn parse_excluded_ancestors(
95        &self,
96        peer: AuthorityIndex,
97        block: &VerifiedBlock,
98        mut excluded_ancestors: Vec<Vec<u8>>,
99    ) -> ConsensusResult<Vec<BlockRef>> {
100        let peer_hostname = &self.context.committee.authority(peer).hostname;
101
102        let excluded_ancestors_limit = self.context.committee.size() * 2;
103        if excluded_ancestors.len() > excluded_ancestors_limit {
104            debug!(
105                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
106                excluded_ancestors.len() - excluded_ancestors_limit,
107                peer,
108                peer_hostname,
109            );
110            excluded_ancestors.truncate(excluded_ancestors_limit);
111        }
112
113        let excluded_ancestors = excluded_ancestors
114            .into_iter()
115            .map(|serialized| {
116                let block_ref: BlockRef =
117                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
118                if !self.context.committee.is_valid_index(block_ref.author) {
119                    return Err(ConsensusError::InvalidAuthorityIndex {
120                        index: block_ref.author,
121                        max: self.context.committee.size(),
122                    });
123                }
124                if block_ref.round >= block.round() {
125                    return Err(ConsensusError::InvalidAncestorRound {
126                        ancestor: block_ref.round,
127                        block: block.round(),
128                    });
129                }
130                Ok(block_ref)
131            })
132            .collect::<ConsensusResult<Vec<BlockRef>>>()?;
133
134        for excluded_ancestor in &excluded_ancestors {
135            let excluded_ancestor_hostname = &self
136                .context
137                .committee
138                .authority(excluded_ancestor.author)
139                .hostname;
140            self.context
141                .metrics
142                .node_metrics
143                .network_excluded_ancestors_count_by_authority
144                .with_label_values(&[excluded_ancestor_hostname])
145                .inc();
146        }
147        self.context
148            .metrics
149            .node_metrics
150            .network_received_excluded_ancestors_from_authority
151            .with_label_values(&[peer_hostname])
152            .inc_by(excluded_ancestors.len() as u64);
153
154        Ok(excluded_ancestors)
155    }
156}
157
158#[async_trait]
159impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
160    async fn handle_send_block(
161        &self,
162        peer: AuthorityIndex,
163        serialized_block: ExtendedSerializedBlock,
164    ) -> ConsensusResult<()> {
165        fail_point_async!("consensus-rpc-response");
166
167        let peer_hostname = &self.context.committee.authority(peer).hostname;
168
169        // TODO: dedup block verifications, here and with fetched blocks.
170        let signed_block: SignedBlock =
171            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
172
173        // Reject blocks not produced by the peer.
174        if peer != signed_block.author() {
175            self.context
176                .metrics
177                .node_metrics
178                .invalid_blocks
179                .with_label_values(&[
180                    peer_hostname.as_str(),
181                    "handle_send_block",
182                    "UnexpectedAuthority",
183                ])
184                .inc();
185            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
186            info!("Block with wrong authority from {}: {}", peer, e);
187            return Err(e);
188        }
189
190        // Reject blocks failing parsing and validations.
191        let (verified_block, reject_txn_votes) = self
192            .block_verifier
193            .verify_and_vote(signed_block, serialized_block.block)
194            .tap_err(|e| {
195                self.context
196                    .metrics
197                    .node_metrics
198                    .invalid_blocks
199                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
200                    .inc();
201                info!("Invalid block from {}: {}", peer, e);
202            })?;
203        let excluded_ancestors = self
204            .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
205            .tap_err(|e| {
206                debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
207                self.context
208                    .metrics
209                    .node_metrics
210                    .invalid_blocks
211                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
212                    .inc();
213            })?;
214
215        let block_ref = verified_block.reference();
216        debug!("Received block {} via send block.", block_ref);
217
218        self.context
219            .metrics
220            .node_metrics
221            .verified_blocks
222            .with_label_values(&[peer_hostname])
223            .inc();
224
225        let now = self.context.clock.timestamp_utc_ms();
226        let forward_time_drift =
227            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
228
229        self.context
230            .metrics
231            .node_metrics
232            .block_timestamp_drift_ms
233            .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
234            .inc_by(forward_time_drift.as_millis() as u64);
235
236        // Observe the block for the commit votes. When local commit is lagging too much,
237        // commit sync loop will trigger fetching.
238        self.commit_vote_monitor.observe_block(&verified_block);
239
240        // Update own received rounds and peer accepted rounds from this verified block.
241        self.round_tracker
242            .write()
243            .update_from_verified_block(&ExtendedBlock {
244                block: verified_block.clone(),
245                excluded_ancestors: excluded_ancestors.clone(),
246            });
247
248        // Reject blocks when local commit index is lagging too far from quorum commit index,
249        // to avoid the memory overhead from suspended blocks.
250        //
251        // IMPORTANT: this must be done after observing votes from the block, otherwise
252        // observed quorum commit will no longer progress.
253        //
254        // Since the main issue with too many suspended blocks is memory usage not CPU,
255        // it is ok to reject after block verifications instead of before.
256        let last_commit_index = self.dag_state.read().last_commit_index();
257        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
258        // The threshold to ignore block should be larger than commit_sync_batch_size,
259        // to avoid excessive block rejections and synchronizations.
260        if last_commit_index
261            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
262            < quorum_commit_index
263        {
264            self.context
265                .metrics
266                .node_metrics
267                .rejected_blocks
268                .with_label_values(&["commit_lagging"])
269                .inc();
270            debug!(
271                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
272                block_ref, last_commit_index, quorum_commit_index,
273            );
274            return Err(ConsensusError::BlockRejected {
275                block_ref,
276                reason: format!(
277                    "Last commit index is lagging quorum commit index too much ({} < {})",
278                    last_commit_index, quorum_commit_index,
279                ),
280            });
281        }
282
283        // The block is verified and current, so record own votes on the block
284        // before sending the block to Core.
285        if self.context.protocol_config.transaction_voting_enabled() {
286            self.transaction_vote_tracker
287                .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
288        }
289
290        // Send the block to Core to try accepting it into the DAG.
291        let missing_ancestors = self
292            .core_dispatcher
293            .add_blocks(vec![verified_block.clone()])
294            .await
295            .map_err(|_| ConsensusError::Shutdown)?;
296
297        // Schedule fetching missing ancestors from this peer in the background.
298        if !missing_ancestors.is_empty() {
299            self.context
300                .metrics
301                .node_metrics
302                .handler_received_block_missing_ancestors
303                .with_label_values(&[peer_hostname])
304                .inc_by(missing_ancestors.len() as u64);
305            let synchronizer = self.synchronizer.clone();
306            spawn_monitored_task!(async move {
307                // This does not wait for the fetch request to complete.
308                // It only waits for synchronizer to queue the request to a peer.
309                // When this fails, it usually means the queue is full.
310                // The fetch will retry from other peers via live and periodic syncs.
311                if let Err(err) = synchronizer
312                    .fetch_blocks(missing_ancestors, PeerId::Validator(peer))
313                    .await
314                {
315                    debug!("Failed to fetch missing ancestors via synchronizer: {err}");
316                }
317            });
318        }
319
320        // Schedule fetching missing soft links from this peer in the background.
321        let missing_excluded_ancestors = self
322            .core_dispatcher
323            .check_block_refs(excluded_ancestors)
324            .await
325            .map_err(|_| ConsensusError::Shutdown)?;
326        if !missing_excluded_ancestors.is_empty() {
327            self.context
328                .metrics
329                .node_metrics
330                .network_excluded_ancestors_sent_to_fetch
331                .with_label_values(&[peer_hostname])
332                .inc_by(missing_excluded_ancestors.len() as u64);
333
334            let synchronizer = self.synchronizer.clone();
335            spawn_monitored_task!(async move {
336                if let Err(err) = synchronizer
337                    .fetch_blocks(missing_excluded_ancestors, PeerId::Validator(peer))
338                    .await
339                {
340                    debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
341                }
342            });
343        }
344
345        Ok(())
346    }
347
348    async fn handle_subscribe_blocks(
349        &self,
350        peer: AuthorityIndex,
351        last_received: Round,
352    ) -> ConsensusResult<BlockStream> {
353        fail_point_async!("consensus-rpc-response");
354
355        // Find past proposed blocks as the initial blocks to send to the peer.
356        //
357        // If there are cached blocks in the range which the peer requested, send all of them.
358        // The size is bounded by the local GC round and DagState cache size.
359        //
360        // Otherwise if there is no cached block in the range which the peer requested,
361        // and this node has proposed blocks before, at least one block should be sent to the peer
362        // to help with liveness.
363        let past_proposed_blocks = {
364            let dag_state = self.dag_state.read();
365
366            let mut proposed_blocks =
367                dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
368            if proposed_blocks.is_empty() {
369                let last_proposed_block = dag_state
370                    .get_last_proposed_block()
371                    .expect("Last proposed block should be returned on validators");
372                proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
373                    vec![last_proposed_block]
374                } else {
375                    vec![]
376                };
377            }
378            stream::iter(
379                proposed_blocks
380                    .into_iter()
381                    .map(|block| ExtendedSerializedBlock {
382                        block: block.serialized().clone(),
383                        excluded_ancestors: vec![],
384                    }),
385            )
386        };
387
388        let broadcasted_blocks = BroadcastedBlockStream::new(
389            PeerId::Validator(peer),
390            self.rx_block_broadcast.resubscribe(),
391            self.subscription_counter.clone(),
392        );
393
394        // Return a stream of blocks that first yields missed blocks as requested, then new blocks.
395        Ok(Box::pin(past_proposed_blocks.chain(
396            broadcasted_blocks.map(ExtendedSerializedBlock::from),
397        )))
398    }
399
400    // Handles 3 types of requests:
401    // 1. Live sync:
402    //    - Both missing block refs and highest accepted rounds are specified.
403    //    - fetch_missing_ancestors is true.
404    //    - response returns max_blocks_per_sync blocks.
405    // 2. Periodic sync:
406    //    - Highest accepted rounds must be specified.
407    //    - Missing block refs are optional.
408    //    - fetch_missing_ancestors is false (default).
409    //    - response returns max_blocks_per_fetch blocks.
410    // 3. Commit sync:
411    //    - Missing block refs are specified.
412    //    - Highest accepted rounds are empty.
413    //    - fetch_missing_ancestors is false (default).
414    //    - response returns max_blocks_per_fetch blocks.
415    async fn handle_fetch_blocks(
416        &self,
417        _peer: AuthorityIndex,
418        mut block_refs: Vec<BlockRef>,
419        fetch_after_rounds: Vec<Round>,
420        fetch_missing_ancestors: bool,
421    ) -> ConsensusResult<Vec<Bytes>> {
422        fail_point_async!("consensus-rpc-response");
423
424        if block_refs.is_empty() && (fetch_missing_ancestors || fetch_after_rounds.is_empty()) {
425            return Err(ConsensusError::InvalidFetchBlocksRequest("When no block refs are provided, fetch_after_rounds must be provided and fetch_missing_ancestors must be false".to_string()));
426        }
427        if !fetch_after_rounds.is_empty()
428            && fetch_after_rounds.len() != self.context.committee.size()
429        {
430            return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
431                fetch_after_rounds.len(),
432                self.context.committee.size(),
433            ));
434        }
435
436        // Finds the suitable limit of # of blocks to return.
437        let max_response_num_blocks = if !fetch_after_rounds.is_empty() && !block_refs.is_empty() {
438            self.context.parameters.max_blocks_per_sync
439        } else {
440            self.context.parameters.max_blocks_per_fetch
441        };
442        if block_refs.len() > max_response_num_blocks {
443            block_refs.truncate(max_response_num_blocks);
444        }
445
446        // Validate the requested block refs.
447        for block in &block_refs {
448            if !self.context.committee.is_valid_index(block.author) {
449                return Err(ConsensusError::InvalidAuthorityIndex {
450                    index: block.author,
451                    max: self.context.committee.size(),
452                });
453            }
454            if block.round == GENESIS_ROUND {
455                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
456            }
457        }
458
459        // Get the requested blocks first.
460        let mut blocks = self
461            .dag_state
462            .read()
463            .get_blocks(&block_refs)
464            .into_iter()
465            .flatten()
466            .collect::<Vec<_>>();
467
468        // When fetch_missing_ancestors is true, fetch missing ancestors of the requested blocks.
469        // Otherwise, fetch additional blocks depth-first from the requested block authorities.
470        if blocks.len() < max_response_num_blocks && !fetch_after_rounds.is_empty() {
471            if fetch_missing_ancestors {
472                // Get unique missing ancestor blocks of the requested blocks (validated to be non-empty).
473                // fetch_after_rounds will only be used to filter out already accepted blocks.
474                let missing_ancestors = blocks
475                    .iter()
476                    .flat_map(|block| block.ancestors().to_vec())
477                    .filter(|block_ref| fetch_after_rounds[block_ref.author] < block_ref.round)
478                    .collect::<BTreeSet<_>>()
479                    .into_iter()
480                    .collect::<Vec<_>>();
481
482                // If there are too many missing ancestors, randomly select a subset to avoid
483                // fetching duplicated blocks across peers.
484                let selected_num_blocks = max_response_num_blocks
485                    .saturating_sub(blocks.len())
486                    .min(missing_ancestors.len());
487                if selected_num_blocks > 0 {
488                    let selected_ancestor_refs = missing_ancestors
489                        .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
490                        .copied()
491                        .collect::<Vec<_>>();
492                    let ancestor_blocks = self
493                        .dag_state
494                        .read()
495                        .get_blocks(&selected_ancestor_refs)
496                        .into_iter()
497                        .flatten();
498                    blocks.extend(ancestor_blocks);
499                }
500            } else {
501                // Get additional blocks from authorities with missing block.
502                // Compute the fetch round per requested authority, or all authorities.
503                let mut limit_rounds = BTreeMap::<AuthorityIndex, Round>::new();
504                if block_refs.is_empty() {
505                    let dag_state = self.dag_state.read();
506                    for (index, _authority) in self.context.committee.authorities() {
507                        let last_block = dag_state.get_last_block_for_authority(index);
508                        limit_rounds.insert(index, last_block.round());
509                    }
510                } else {
511                    for block_ref in &block_refs {
512                        let entry = limit_rounds
513                            .entry(block_ref.author)
514                            .or_insert(block_ref.round);
515                        *entry = (*entry).min(block_ref.round);
516                    }
517                }
518
519                // Use a min-heap to fetch blocks across authorities in ascending round order.
520                // Each entry is (fetch_start_round, authority, limit_round).
521                let mut heap = BinaryHeap::new();
522                for (authority, limit_round) in &limit_rounds {
523                    let fetch_start = fetch_after_rounds[*authority] + 1;
524                    if fetch_start < *limit_round {
525                        heap.push(Reverse((fetch_start, *authority, *limit_round)));
526                    }
527                }
528
529                while let Some(Reverse((fetch_start, authority, limit_round))) = heap.pop() {
530                    let fetched = self.store.scan_blocks_by_author_in_range(
531                        authority,
532                        fetch_start,
533                        limit_round,
534                        1,
535                    )?;
536                    if let Some(block) = fetched.into_iter().next() {
537                        let next_start = block.round() + 1;
538                        blocks.push(block);
539                        if blocks.len() >= max_response_num_blocks {
540                            blocks.truncate(max_response_num_blocks);
541                            break;
542                        }
543                        if next_start < limit_round {
544                            heap.push(Reverse((next_start, authority, limit_round)));
545                        }
546                    }
547                }
548            }
549        }
550
551        // Return the serialized blocks
552        let bytes = blocks
553            .into_iter()
554            .map(|block| block.serialized().clone())
555            .collect::<Vec<_>>();
556        Ok(bytes)
557    }
558
559    async fn handle_fetch_commits(
560        &self,
561        _peer: AuthorityIndex,
562        commit_range: CommitRange,
563    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
564        fail_point_async!("consensus-rpc-response");
565
566        // Compute an inclusive end index and bound the maximum number of commits scanned.
567        let inclusive_end = commit_range.end().min(
568            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
569                - 1,
570        );
571        let mut commits = self
572            .store
573            .scan_commits((commit_range.start()..=inclusive_end).into())?;
574        let mut certifier_block_refs = vec![];
575        'commit: while let Some(c) = commits.last() {
576            let index = c.index();
577            let votes = self.store.read_commit_votes(index)?;
578            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
579            for v in &votes {
580                stake_aggregator.add(v.author, &self.context.committee);
581            }
582            if stake_aggregator.reached_threshold(&self.context.committee) {
583                certifier_block_refs = votes;
584                break 'commit;
585            } else {
586                debug!(
587                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
588                    index,
589                    stake_aggregator.stake(),
590                    stake_aggregator.threshold(&self.context.committee)
591                );
592                self.context
593                    .metrics
594                    .node_metrics
595                    .commit_sync_fetch_commits_handler_uncertified_skipped
596                    .inc();
597                commits.pop();
598            }
599        }
600        let certifier_blocks = self
601            .store
602            .read_blocks(&certifier_block_refs)?
603            .into_iter()
604            .flatten()
605            .collect();
606        Ok((commits, certifier_blocks))
607    }
608
609    async fn handle_fetch_latest_blocks(
610        &self,
611        peer: AuthorityIndex,
612        authorities: Vec<AuthorityIndex>,
613    ) -> ConsensusResult<Vec<Bytes>> {
614        fail_point_async!("consensus-rpc-response");
615
616        if authorities.len() > self.context.committee.size() {
617            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
618        }
619
620        // Ensure that those are valid authorities
621        for authority in &authorities {
622            if !self.context.committee.is_valid_index(*authority) {
623                return Err(ConsensusError::InvalidAuthorityIndex {
624                    index: *authority,
625                    max: self.context.committee.size(),
626                });
627            }
628        }
629
630        // Read from the dag state to find the latest blocks.
631        // TODO: at the moment we don't look into the block manager for suspended blocks. Ideally we
632        // want in the future if we think we would like to tackle the majority of cases.
633        let mut blocks = vec![];
634        let dag_state = self.dag_state.read();
635        for authority in authorities {
636            let block = dag_state.get_last_block_for_authority(authority);
637
638            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
639
640            // no reason to serve back the genesis block - it's equal as if it has not received any block
641            if block.round() != GENESIS_ROUND {
642                blocks.push(block);
643            }
644        }
645
646        // Return the serialised blocks
647        let result = blocks
648            .into_iter()
649            .map(|block| block.serialized().clone())
650            .collect::<Vec<_>>();
651
652        Ok(result)
653    }
654
655    async fn handle_get_latest_rounds(
656        &self,
657        _peer: AuthorityIndex,
658    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
659        fail_point_async!("consensus-rpc-response");
660
661        let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
662
663        let blocks = self
664            .dag_state
665            .read()
666            .get_last_cached_block_per_authority(Round::MAX);
667        let highest_accepted_rounds = blocks
668            .into_iter()
669            .map(|(block, _)| block.round())
670            .collect::<Vec<_>>();
671
672        Ok((highest_received_rounds, highest_accepted_rounds))
673    }
674}
675
676#[async_trait]
677impl<C: CoreThreadDispatcher> ObserverNetworkService for AuthorityService<C> {
678    async fn handle_block(
679        &self,
680        _peer: PeerId,
681        _item: ObserverBlockStreamItem,
682    ) -> ConsensusResult<()> {
683        // TODO: implement observer block handling, similar to validator block handling.
684        Err(ConsensusError::NetworkRequest(
685            "Observer block handling not yet implemented".to_string(),
686        ))
687    }
688
689    async fn handle_stream_blocks(
690        &self,
691        _peer: NodeId,
692        _highest_round_per_authority: Vec<u64>,
693    ) -> ConsensusResult<ObserverBlockStream> {
694        // TODO: Implement observer block streaming
695        todo!("Observer block streaming not yet implemented")
696    }
697
698    async fn handle_fetch_blocks(
699        &self,
700        _peer: NodeId,
701        _block_refs: Vec<BlockRef>,
702    ) -> ConsensusResult<Vec<Bytes>> {
703        // TODO: implement observer fetch blocks, similar to validator fetch_blocks but
704        // without highest_accepted_rounds.
705        Err(ConsensusError::NetworkRequest(
706            "Observer fetch blocks not yet implemented".to_string(),
707        ))
708    }
709
710    async fn handle_fetch_commits(
711        &self,
712        _peer: NodeId,
713        _commit_range: CommitRange,
714    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
715        // TODO: implement observer fetch commits, similar to validator fetch_commits.
716        Err(ConsensusError::NetworkRequest(
717            "Observer fetch commits not yet implemented".to_string(),
718        ))
719    }
720}
721struct Counter {
722    count: usize,
723    subscriptions_by_peer: BTreeMap<PeerId, usize>,
724}
725
726/// Atomically counts the number of active subscriptions to the block broadcast stream.
727pub(crate) struct SubscriptionCounter {
728    context: Arc<Context>,
729    counter: parking_lot::Mutex<Counter>,
730}
731
732impl SubscriptionCounter {
733    pub(crate) fn new(context: Arc<Context>) -> Self {
734        // Set the subscribed peers by default to 0
735        for (_, authority) in context.committee.authorities() {
736            context
737                .metrics
738                .node_metrics
739                .subscribed_by
740                .with_label_values(&[authority.hostname.as_str()])
741                .set(0);
742        }
743
744        Self {
745            counter: parking_lot::Mutex::new(Counter {
746                count: 0,
747                subscriptions_by_peer: BTreeMap::new(),
748            }),
749            context,
750        }
751    }
752
753    fn increment(&self, peer: &PeerId) -> Result<(), ConsensusError> {
754        let mut counter = self.counter.lock();
755        counter.count += 1;
756        *counter
757            .subscriptions_by_peer
758            .entry(peer.clone())
759            .or_insert(0) += 1;
760
761        match peer {
762            PeerId::Validator(authority) => {
763                let peer_hostname = &self.context.committee.authority(*authority).hostname;
764                self.context
765                    .metrics
766                    .node_metrics
767                    .subscribed_by
768                    .with_label_values(&[peer_hostname])
769                    .set(1);
770            }
771            PeerId::Observer(_) => {
772                self.context
773                    .metrics
774                    .node_metrics
775                    .subscribed_by
776                    .with_label_values(&["observer"])
777                    .inc();
778            }
779        }
780
781        Ok(())
782    }
783
784    fn decrement(&self, peer: &PeerId) -> Result<(), ConsensusError> {
785        let mut counter = self.counter.lock();
786        counter.count -= 1;
787        *counter
788            .subscriptions_by_peer
789            .entry(peer.clone())
790            .or_insert(0) -= 1;
791
792        if counter.subscriptions_by_peer[peer] == 0 {
793            match peer {
794                PeerId::Validator(authority) => {
795                    let peer_hostname = &self.context.committee.authority(*authority).hostname;
796                    self.context
797                        .metrics
798                        .node_metrics
799                        .subscribed_by
800                        .with_label_values(&[peer_hostname])
801                        .set(0);
802                }
803                PeerId::Observer(_) => {
804                    self.context
805                        .metrics
806                        .node_metrics
807                        .subscribed_by
808                        .with_label_values(&["observer"])
809                        .dec();
810                }
811            }
812        }
813
814        Ok(())
815    }
816}
817
818/// Each broadcasted block stream wraps a broadcast receiver for blocks.
819/// It yields blocks that are broadcasted after the stream is created.
820type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
821
822/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
823/// this tolerates lags with only logging, without yielding errors.
824pub(crate) struct BroadcastStream<T> {
825    peer: PeerId,
826    // Stores the receiver across poll_next() calls.
827    inner: ReusableBoxFuture<
828        'static,
829        (
830            Result<T, broadcast::error::RecvError>,
831            broadcast::Receiver<T>,
832        ),
833    >,
834    // Counts total subscriptions / active BroadcastStreams.
835    subscription_counter: Arc<SubscriptionCounter>,
836}
837
838impl<T: 'static + Clone + Send> BroadcastStream<T> {
839    pub fn new(
840        peer: PeerId,
841        rx: broadcast::Receiver<T>,
842        subscription_counter: Arc<SubscriptionCounter>,
843    ) -> Self {
844        if let Err(err) = subscription_counter.increment(&peer) {
845            match err {
846                ConsensusError::Shutdown => {}
847                _ => panic!("Unexpected error: {err}"),
848            }
849        }
850        Self {
851            peer,
852            inner: ReusableBoxFuture::new(make_recv_future(rx)),
853            subscription_counter,
854        }
855    }
856}
857
858impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
859    type Item = T;
860
861    fn poll_next(
862        mut self: Pin<&mut Self>,
863        cx: &mut task::Context<'_>,
864    ) -> task::Poll<Option<Self::Item>> {
865        let peer = self.peer.clone();
866        let maybe_item = loop {
867            let (result, rx) = ready!(self.inner.poll(cx));
868            self.inner.set(make_recv_future(rx));
869
870            match result {
871                Ok(item) => break Some(item),
872                Err(broadcast::error::RecvError::Closed) => {
873                    info!("Block BroadcastedBlockStream {} closed", peer);
874                    break None;
875                }
876                Err(broadcast::error::RecvError::Lagged(n)) => {
877                    warn!(
878                        "Block BroadcastedBlockStream {} lagged by {} messages",
879                        peer, n
880                    );
881                    continue;
882                }
883            }
884        };
885        task::Poll::Ready(maybe_item)
886    }
887}
888
889impl<T> Drop for BroadcastStream<T> {
890    fn drop(&mut self) {
891        if let Err(err) = self.subscription_counter.decrement(&self.peer) {
892            match err {
893                ConsensusError::Shutdown => {}
894                _ => panic!("Unexpected error: {err}"),
895            }
896        }
897    }
898}
899
900async fn make_recv_future<T: Clone>(
901    mut rx: broadcast::Receiver<T>,
902) -> (
903    Result<T, broadcast::error::RecvError>,
904    broadcast::Receiver<T>,
905) {
906    let result = rx.recv().await;
907    (result, rx)
908}
909
910// TODO: add a unit test for BroadcastStream.
911
912#[cfg(test)]
913mod tests {
914    use std::{
915        collections::{BTreeMap, BTreeSet},
916        sync::Arc,
917        time::Duration,
918    };
919
920    use async_trait::async_trait;
921    use bytes::Bytes;
922    use consensus_config::AuthorityIndex;
923    use consensus_types::block::{BlockDigest, BlockRef, Round};
924    use parking_lot::{Mutex, RwLock};
925    use tokio::{sync::broadcast, time::sleep};
926
927    use futures::StreamExt as _;
928
929    use crate::{
930        authority_service::AuthorityService,
931        block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
932        commit::{CertifiedCommits, CommitRange},
933        commit_vote_monitor::CommitVoteMonitor,
934        context::Context,
935        core_thread::{CoreError, CoreThreadDispatcher},
936        dag_state::DagState,
937        error::ConsensusResult,
938        network::{
939            BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
940            ValidatorNetworkClient, ValidatorNetworkService,
941        },
942        peers_pool::PeersPool,
943        round_tracker::RoundTracker,
944        storage::mem_store::MemStore,
945        synchronizer::Synchronizer,
946        test_dag_builder::DagBuilder,
947        transaction_vote_tracker::TransactionVoteTracker,
948    };
949    struct FakeCoreThreadDispatcher {
950        blocks: Mutex<Vec<VerifiedBlock>>,
951    }
952
953    impl FakeCoreThreadDispatcher {
954        fn new() -> Self {
955            Self {
956                blocks: Mutex::new(vec![]),
957            }
958        }
959
960        fn get_blocks(&self) -> Vec<VerifiedBlock> {
961            self.blocks.lock().clone()
962        }
963    }
964
965    #[async_trait]
966    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
967        async fn add_blocks(
968            &self,
969            blocks: Vec<VerifiedBlock>,
970        ) -> Result<BTreeSet<BlockRef>, CoreError> {
971            let block_refs = blocks.iter().map(|b| b.reference()).collect();
972            self.blocks.lock().extend(blocks);
973            Ok(block_refs)
974        }
975
976        async fn check_block_refs(
977            &self,
978            _block_refs: Vec<BlockRef>,
979        ) -> Result<BTreeSet<BlockRef>, CoreError> {
980            Ok(BTreeSet::new())
981        }
982
983        async fn add_certified_commits(
984            &self,
985            _commits: CertifiedCommits,
986        ) -> Result<BTreeSet<BlockRef>, CoreError> {
987            todo!()
988        }
989
990        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
991            Ok(())
992        }
993
994        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
995            Ok(Default::default())
996        }
997
998        fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
999            todo!()
1000        }
1001
1002        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
1003            todo!()
1004        }
1005    }
1006
1007    #[derive(Default)]
1008    struct FakeNetworkClient {}
1009
1010    #[async_trait]
1011    impl ValidatorNetworkClient for FakeNetworkClient {
1012        async fn send_block(
1013            &self,
1014            _peer: AuthorityIndex,
1015            _block: &VerifiedBlock,
1016            _timeout: Duration,
1017        ) -> ConsensusResult<()> {
1018            unimplemented!("Unimplemented")
1019        }
1020
1021        async fn subscribe_blocks(
1022            &self,
1023            _peer: AuthorityIndex,
1024            _last_received: Round,
1025            _timeout: Duration,
1026        ) -> ConsensusResult<BlockStream> {
1027            unimplemented!("Unimplemented")
1028        }
1029
1030        async fn fetch_blocks(
1031            &self,
1032            _peer: AuthorityIndex,
1033            _block_refs: Vec<BlockRef>,
1034            _fetch_after_rounds: Vec<Round>,
1035            _fetch_missing_ancestors: bool,
1036            _timeout: Duration,
1037        ) -> ConsensusResult<Vec<Bytes>> {
1038            unimplemented!("Unimplemented")
1039        }
1040
1041        async fn fetch_commits(
1042            &self,
1043            _peer: AuthorityIndex,
1044            _commit_range: CommitRange,
1045            _timeout: Duration,
1046        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1047            unimplemented!("Unimplemented")
1048        }
1049
1050        async fn fetch_latest_blocks(
1051            &self,
1052            _peer: AuthorityIndex,
1053            _authorities: Vec<AuthorityIndex>,
1054            _timeout: Duration,
1055        ) -> ConsensusResult<Vec<Bytes>> {
1056            unimplemented!("Unimplemented")
1057        }
1058
1059        async fn get_latest_rounds(
1060            &self,
1061            _peer: AuthorityIndex,
1062            _timeout: Duration,
1063        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1064            unimplemented!("Unimplemented")
1065        }
1066    }
1067
1068    #[async_trait]
1069    impl ObserverNetworkClient for FakeNetworkClient {
1070        async fn stream_blocks(
1071            &self,
1072            _peer: crate::network::PeerId,
1073            _highest_round_per_authority: Vec<u64>,
1074            _timeout: Duration,
1075        ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1076            unimplemented!("Unimplemented")
1077        }
1078
1079        async fn fetch_blocks(
1080            &self,
1081            _peer: crate::network::PeerId,
1082            _block_refs: Vec<BlockRef>,
1083            _timeout: Duration,
1084        ) -> ConsensusResult<Vec<Bytes>> {
1085            unimplemented!("Unimplemented")
1086        }
1087
1088        async fn fetch_commits(
1089            &self,
1090            _peer: crate::network::PeerId,
1091            _commit_range: CommitRange,
1092            _timeout: Duration,
1093        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1094            unimplemented!("Unimplemented")
1095        }
1096    }
1097
1098    #[tokio::test(flavor = "current_thread", start_paused = true)]
1099    async fn test_handle_send_block() {
1100        let (context, _keys) = Context::new_for_test(4);
1101        let context = Arc::new(context);
1102        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1103        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1104        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1105        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1106        let fake_client = Arc::new(FakeNetworkClient::default());
1107        let network_client = Arc::new(SynchronizerClient::new(
1108            context.clone(),
1109            Some(fake_client.clone()),
1110            Some(fake_client.clone()),
1111        ));
1112        let store = Arc::new(MemStore::new());
1113        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1114        let transaction_vote_tracker =
1115            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1116        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1117        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1118        let synchronizer = Synchronizer::start(
1119            network_client,
1120            context.clone(),
1121            core_dispatcher.clone(),
1122            commit_vote_monitor.clone(),
1123            block_verifier.clone(),
1124            transaction_vote_tracker.clone(),
1125            round_tracker.clone(),
1126            dag_state.clone(),
1127            peers_pool.clone(),
1128            false,
1129        );
1130        let authority_service = Arc::new(AuthorityService::new(
1131            context.clone(),
1132            block_verifier,
1133            commit_vote_monitor,
1134            round_tracker,
1135            synchronizer,
1136            core_dispatcher.clone(),
1137            rx_block_broadcast,
1138            transaction_vote_tracker,
1139            dag_state,
1140            store,
1141        ));
1142
1143        // Test delaying blocks with time drift.
1144        let now = context.clock.timestamp_utc_ms();
1145        let max_drift = context.parameters.max_forward_time_drift;
1146        let input_block = VerifiedBlock::new_for_test(
1147            TestBlock::new(9, 0)
1148                .set_timestamp_ms(now + max_drift.as_millis() as u64)
1149                .build(),
1150        );
1151
1152        let service = authority_service.clone();
1153        let serialized = ExtendedSerializedBlock {
1154            block: input_block.serialized().clone(),
1155            excluded_ancestors: vec![],
1156        };
1157
1158        tokio::spawn({
1159            let service = service.clone();
1160            let context = context.clone();
1161            async move {
1162                service
1163                    .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1164                    .await
1165                    .unwrap();
1166            }
1167        });
1168
1169        sleep(max_drift / 2).await;
1170
1171        let blocks = core_dispatcher.get_blocks();
1172        assert_eq!(blocks.len(), 1);
1173        assert_eq!(blocks[0], input_block);
1174
1175        // Test invalid block.
1176        let invalid_block =
1177            VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1178        let extended_block = ExtendedSerializedBlock {
1179            block: invalid_block.serialized().clone(),
1180            excluded_ancestors: vec![],
1181        };
1182        service
1183            .handle_send_block(
1184                context.committee.to_authority_index(0).unwrap(),
1185                extended_block,
1186            )
1187            .await
1188            .unwrap_err();
1189
1190        // Test invalid excluded ancestors.
1191        let invalid_excluded_ancestors = vec![
1192            bcs::to_bytes(&BlockRef::new(
1193                10,
1194                AuthorityIndex::new_for_test(1000),
1195                BlockDigest::MIN,
1196            ))
1197            .unwrap(),
1198            vec![3u8; 40],
1199            bcs::to_bytes(&invalid_block.reference()).unwrap(),
1200        ];
1201        let extended_block = ExtendedSerializedBlock {
1202            block: input_block.serialized().clone(),
1203            excluded_ancestors: invalid_excluded_ancestors,
1204        };
1205        service
1206            .handle_send_block(
1207                context.committee.to_authority_index(0).unwrap(),
1208                extended_block,
1209            )
1210            .await
1211            .unwrap_err();
1212    }
1213
1214    #[tokio::test(flavor = "current_thread", start_paused = true)]
1215    async fn test_handle_fetch_blocks() {
1216        // GIVEN
1217        // Use NUM_AUTHORITIES and NUM_ROUNDS higher than max_blocks_per_sync to test limits.
1218        const NUM_AUTHORITIES: usize = 40;
1219        const NUM_ROUNDS: usize = 40;
1220        let (mut context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1221        context.parameters.max_blocks_per_fetch = 50;
1222        let context = Arc::new(context);
1223        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1224        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1225        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1226        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1227        let fake_client = Arc::new(FakeNetworkClient::default());
1228        let network_client = Arc::new(SynchronizerClient::new(
1229            context.clone(),
1230            Some(fake_client.clone()),
1231            Some(fake_client.clone()),
1232        ));
1233        let store = Arc::new(MemStore::new());
1234        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1235        let transaction_vote_tracker =
1236            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1237        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1238        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1239        let synchronizer = Synchronizer::start(
1240            network_client,
1241            context.clone(),
1242            core_dispatcher.clone(),
1243            commit_vote_monitor.clone(),
1244            block_verifier.clone(),
1245            transaction_vote_tracker.clone(),
1246            round_tracker.clone(),
1247            dag_state.clone(),
1248            peers_pool.clone(),
1249            false,
1250        );
1251        let authority_service = Arc::new(AuthorityService::new(
1252            context.clone(),
1253            block_verifier,
1254            commit_vote_monitor,
1255            round_tracker,
1256            synchronizer,
1257            core_dispatcher.clone(),
1258            rx_block_broadcast,
1259            transaction_vote_tracker,
1260            dag_state.clone(),
1261            store,
1262        ));
1263
1264        // GIVEN: 40 rounds of blocks in the dag state.
1265        let mut dag_builder = DagBuilder::new(context.clone());
1266        dag_builder
1267            .layers(1..=(NUM_ROUNDS as u32))
1268            .build()
1269            .persist_layers(dag_state.clone());
1270        dag_state.write().flush();
1271        let all_blocks = dag_builder.all_blocks();
1272
1273        // WHEN: Request 2 blocks from round 40, fetch missing ancestors enabled.
1274        let missing_block_refs: Vec<BlockRef> = all_blocks
1275            .iter()
1276            .rev()
1277            .take(2)
1278            .map(|b| b.reference())
1279            .collect();
1280        let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1281        let results = authority_service
1282            .handle_fetch_blocks(
1283                AuthorityIndex::new_for_test(0),
1284                missing_block_refs.clone(),
1285                highest_accepted_rounds,
1286                true,
1287            )
1288            .await
1289            .unwrap();
1290
1291        // THEN: the expected number of unique blocks are returned.
1292        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1293            .iter()
1294            .map(|b| {
1295                let signed = bcs::from_bytes(b).unwrap();
1296                let block = VerifiedBlock::new_verified(signed, b.clone());
1297                (block.reference(), block)
1298            })
1299            .collect();
1300        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1301        // All missing blocks are returned.
1302        for b in &missing_block_refs {
1303            assert!(blocks.contains_key(b));
1304        }
1305        let num_missing_ancestors = blocks
1306            .keys()
1307            .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1308            .count();
1309        assert_eq!(
1310            num_missing_ancestors,
1311            context.parameters.max_blocks_per_sync - missing_block_refs.len()
1312        );
1313
1314        // WHEN: Request 2 blocks from round 37, fetch missing ancestors disabled.
1315        let missing_round = NUM_ROUNDS as Round - 3;
1316        let missing_block_refs: Vec<BlockRef> = all_blocks
1317            .iter()
1318            .filter(|b| b.reference().round == missing_round)
1319            .map(|b| b.reference())
1320            .take(2)
1321            .collect();
1322        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1323        // Try to fill up the blocks from the 1st authority in missing_block_refs.
1324        highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1325        let results = authority_service
1326            .handle_fetch_blocks(
1327                AuthorityIndex::new_for_test(0),
1328                missing_block_refs.clone(),
1329                highest_accepted_rounds,
1330                false,
1331            )
1332            .await
1333            .unwrap();
1334
1335        // THEN: the expected number of unique blocks are returned.
1336        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1337            .iter()
1338            .map(|b| {
1339                let signed = bcs::from_bytes(b).unwrap();
1340                let block = VerifiedBlock::new_verified(signed, b.clone());
1341                (block.reference(), block)
1342            })
1343            .collect();
1344        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1345        // All missing blocks are returned.
1346        for b in &missing_block_refs {
1347            assert!(blocks.contains_key(b));
1348        }
1349        // Ancestor blocks are from the expected rounds and authorities.
1350        let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1351        for b in blocks.keys() {
1352            assert!(b.round <= missing_round);
1353            assert!(expected_authors.contains(&b.author));
1354        }
1355
1356        // WHEN: Request with empty block_refs, fetch missing ancestors disabled.
1357        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1358        // Set a few authorities to higher accepted rounds.
1359        highest_accepted_rounds[0] = (NUM_ROUNDS as Round) - 5;
1360        highest_accepted_rounds[1] = (NUM_ROUNDS as Round) - 3;
1361        let results = authority_service
1362            .handle_fetch_blocks(
1363                AuthorityIndex::new_for_test(0),
1364                vec![],
1365                highest_accepted_rounds.clone(),
1366                false,
1367            )
1368            .await
1369            .unwrap();
1370
1371        // THEN: the expected number of unique blocks are returned.
1372        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1373            .iter()
1374            .map(|b| {
1375                let signed = bcs::from_bytes(b).unwrap();
1376                let block = VerifiedBlock::new_verified(signed, b.clone());
1377                (block.reference(), block)
1378            })
1379            .collect();
1380        assert_eq!(blocks.len(), context.parameters.max_blocks_per_fetch);
1381        // Blocks should be from all authorities, within the expected round range.
1382        for block_ref in blocks.keys() {
1383            let accepted = highest_accepted_rounds[block_ref.author];
1384            assert!(block_ref.round > accepted);
1385        }
1386        // Blocks should be fetched in ascending round order across authorities,
1387        // so blocks should have low rounds near the accepted rounds.
1388        let max_round_in_result = blocks.keys().map(|b| b.round).max().unwrap();
1389        // With 40 authorities mostly at accepted round 1 and max_blocks_per_fetch=50,
1390        // the min-heap fills ~1-2 rounds per authority.
1391        assert!(
1392            max_round_in_result <= 4,
1393            "Expected low rounds from fair round-order fetching, got max round {}",
1394            max_round_in_result
1395        );
1396
1397        // WHEN: Request 5 block from round 40, not getting ancestors.
1398        let missing_block_refs: Vec<BlockRef> = all_blocks
1399            .iter()
1400            .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1401            .map(|b| b.reference())
1402            .take(5)
1403            .collect();
1404        let results = authority_service
1405            .handle_fetch_blocks(
1406                AuthorityIndex::new_for_test(0),
1407                missing_block_refs.clone(),
1408                vec![],
1409                false,
1410            )
1411            .await
1412            .unwrap();
1413
1414        // THEN: the expected number of unique blocks are returned.
1415        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1416            .iter()
1417            .map(|b| {
1418                let signed = bcs::from_bytes(b).unwrap();
1419                let block = VerifiedBlock::new_verified(signed, b.clone());
1420                (block.reference(), block)
1421            })
1422            .collect();
1423        assert_eq!(blocks.len(), 5);
1424        for b in &missing_block_refs {
1425            assert!(blocks.contains_key(b));
1426        }
1427    }
1428
1429    #[tokio::test(flavor = "current_thread", start_paused = true)]
1430    async fn test_handle_fetch_latest_blocks() {
1431        // GIVEN
1432        let (context, _keys) = Context::new_for_test(4);
1433        let context = Arc::new(context);
1434        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1435        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1436        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1437        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1438        let fake_client = Arc::new(FakeNetworkClient::default());
1439        let network_client = Arc::new(SynchronizerClient::new(
1440            context.clone(),
1441            Some(fake_client.clone()),
1442            Some(fake_client.clone()),
1443        ));
1444        let store = Arc::new(MemStore::new());
1445        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1446        let transaction_vote_tracker =
1447            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1448        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1449        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1450        let synchronizer = Synchronizer::start(
1451            network_client,
1452            context.clone(),
1453            core_dispatcher.clone(),
1454            commit_vote_monitor.clone(),
1455            block_verifier.clone(),
1456            transaction_vote_tracker.clone(),
1457            round_tracker.clone(),
1458            dag_state.clone(),
1459            peers_pool.clone(),
1460            true,
1461        );
1462        let authority_service = Arc::new(AuthorityService::new(
1463            context.clone(),
1464            block_verifier,
1465            commit_vote_monitor,
1466            round_tracker,
1467            synchronizer,
1468            core_dispatcher.clone(),
1469            rx_block_broadcast,
1470            transaction_vote_tracker,
1471            dag_state.clone(),
1472            store,
1473        ));
1474
1475        // Create some blocks for a few authorities. Create some equivocations as well and store in dag state.
1476        let mut dag_builder = DagBuilder::new(context.clone());
1477        dag_builder
1478            .layers(1..=10)
1479            .authorities(vec![AuthorityIndex::new_for_test(2)])
1480            .equivocate(1)
1481            .build()
1482            .persist_layers(dag_state);
1483
1484        // WHEN
1485        let authorities_to_request = vec![
1486            AuthorityIndex::new_for_test(1),
1487            AuthorityIndex::new_for_test(2),
1488        ];
1489        let results = authority_service
1490            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1491            .await;
1492
1493        // THEN
1494        let serialised_blocks = results.unwrap();
1495        for serialised_block in serialised_blocks {
1496            let signed_block: SignedBlock =
1497                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1498            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1499
1500            assert_eq!(verified_block.round(), 10);
1501        }
1502    }
1503
1504    #[tokio::test(flavor = "current_thread", start_paused = true)]
1505    async fn test_handle_subscribe_blocks() {
1506        let (context, _keys) = Context::new_for_test(4);
1507        let context = Arc::new(context);
1508        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1509        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1510        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1511        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1512        let fake_client = Arc::new(FakeNetworkClient::default());
1513        let network_client = Arc::new(SynchronizerClient::new(
1514            context.clone(),
1515            Some(fake_client.clone()),
1516            Some(fake_client.clone()),
1517        ));
1518        let store = Arc::new(MemStore::new());
1519        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1520        let transaction_vote_tracker =
1521            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1522        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1523        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1524        let synchronizer = Synchronizer::start(
1525            network_client,
1526            context.clone(),
1527            core_dispatcher.clone(),
1528            commit_vote_monitor.clone(),
1529            block_verifier.clone(),
1530            transaction_vote_tracker.clone(),
1531            round_tracker.clone(),
1532            dag_state.clone(),
1533            peers_pool.clone(),
1534            false,
1535        );
1536
1537        // Create 3 proposed blocks at rounds 5, 10, 15 for own authority (index 0)
1538        dag_state
1539            .write()
1540            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1541        dag_state
1542            .write()
1543            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1544        dag_state
1545            .write()
1546            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1547
1548        let authority_service = Arc::new(AuthorityService::new(
1549            context.clone(),
1550            block_verifier,
1551            commit_vote_monitor,
1552            round_tracker,
1553            synchronizer,
1554            core_dispatcher.clone(),
1555            rx_block_broadcast,
1556            transaction_vote_tracker,
1557            dag_state.clone(),
1558            store,
1559        ));
1560
1561        let peer = context.committee.to_authority_index(1).unwrap();
1562
1563        // Case A: Subscribe with last_received = 100 (after all proposed blocks)
1564        // Should return last proposed block (round 15) as fallback
1565        {
1566            let mut stream = authority_service
1567                .handle_subscribe_blocks(peer, 100)
1568                .await
1569                .unwrap();
1570            let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1571            assert_eq!(
1572                block.round(),
1573                15,
1574                "Should return last proposed block as fallback"
1575            );
1576            assert_eq!(block.author().value(), 0);
1577        }
1578
1579        // Case B: Subscribe with last_received = 7 (includes rounds 10, 15)
1580        // Should return cached blocks from round 8+
1581        {
1582            let mut stream = authority_service
1583                .handle_subscribe_blocks(peer, 7)
1584                .await
1585                .unwrap();
1586
1587            let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1588            assert_eq!(block1.round(), 10, "Should return block at round 10");
1589
1590            let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1591            assert_eq!(block2.round(), 15, "Should return block at round 15");
1592        }
1593    }
1594
1595    #[tokio::test(flavor = "current_thread", start_paused = true)]
1596    async fn test_handle_subscribe_blocks_not_proposed() {
1597        let (context, _keys) = Context::new_for_test(4);
1598        let context = Arc::new(context);
1599        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1600        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1601        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1602        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1603        let fake_client = Arc::new(FakeNetworkClient::default());
1604        let network_client = Arc::new(SynchronizerClient::new(
1605            context.clone(),
1606            Some(fake_client.clone()),
1607            Some(fake_client.clone()),
1608        ));
1609        let store = Arc::new(MemStore::new());
1610        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1611        let transaction_vote_tracker =
1612            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1613        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1614        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1615        let synchronizer = Synchronizer::start(
1616            network_client,
1617            context.clone(),
1618            core_dispatcher.clone(),
1619            commit_vote_monitor.clone(),
1620            block_verifier.clone(),
1621            transaction_vote_tracker.clone(),
1622            round_tracker.clone(),
1623            dag_state.clone(),
1624            peers_pool.clone(),
1625            false,
1626        );
1627
1628        // No blocks added to DagState - only genesis exists
1629
1630        let authority_service = Arc::new(AuthorityService::new(
1631            context.clone(),
1632            block_verifier,
1633            commit_vote_monitor,
1634            round_tracker,
1635            synchronizer,
1636            core_dispatcher.clone(),
1637            rx_block_broadcast,
1638            transaction_vote_tracker,
1639            dag_state.clone(),
1640            store,
1641        ));
1642
1643        let peer = context.committee.to_authority_index(1).unwrap();
1644
1645        // Subscribe - no blocks have been proposed yet (only genesis exists)
1646        let mut stream = authority_service
1647            .handle_subscribe_blocks(peer, 0)
1648            .await
1649            .unwrap();
1650
1651        // Should NOT receive any block (genesis must not be returned)
1652        use futures::poll;
1653        use std::task::Poll;
1654        let poll_result = poll!(stream.next());
1655        assert!(
1656            matches!(poll_result, Poll::Pending),
1657            "Should not receive genesis block on subscription stream"
1658        );
1659    }
1660}