consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    pin::Pin,
7    sync::Arc,
8    time::Duration,
9};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use consensus_types::block::{BlockRef, Round};
15use futures::{Stream, StreamExt, ready, stream, task};
16use mysten_metrics::spawn_monitored_task;
17use parking_lot::RwLock;
18use rand::seq::SliceRandom as _;
19use sui_macros::fail_point_async;
20use tap::TapFallible;
21use tokio::sync::broadcast;
22use tokio_util::sync::ReusableBoxFuture;
23use tracing::{debug, info, warn};
24
25use crate::{
26    CommitIndex,
27    block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
28    block_verifier::BlockVerifier,
29    commit::{CommitAPI as _, CommitRange, TrustedCommit},
30    commit_vote_monitor::CommitVoteMonitor,
31    context::Context,
32    core_thread::CoreThreadDispatcher,
33    dag_state::DagState,
34    error::{ConsensusError, ConsensusResult},
35    network::{BlockStream, ExtendedSerializedBlock, NetworkService},
36    round_tracker::RoundTracker,
37    stake_aggregator::{QuorumThreshold, StakeAggregator},
38    storage::Store,
39    synchronizer::SynchronizerHandle,
40    transaction_certifier::TransactionCertifier,
41};
42
43pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
44
45/// Authority's network service implementation, agnostic to the actual networking stack used.
46pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
47    context: Arc<Context>,
48    commit_vote_monitor: Arc<CommitVoteMonitor>,
49    block_verifier: Arc<dyn BlockVerifier>,
50    synchronizer: Arc<SynchronizerHandle>,
51    core_dispatcher: Arc<C>,
52    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
53    subscription_counter: Arc<SubscriptionCounter>,
54    transaction_certifier: TransactionCertifier,
55    dag_state: Arc<RwLock<DagState>>,
56    store: Arc<dyn Store>,
57    round_tracker: Arc<RwLock<RoundTracker>>,
58}
59
60impl<C: CoreThreadDispatcher> AuthorityService<C> {
61    pub(crate) fn new(
62        context: Arc<Context>,
63        block_verifier: Arc<dyn BlockVerifier>,
64        commit_vote_monitor: Arc<CommitVoteMonitor>,
65        round_tracker: Arc<RwLock<RoundTracker>>,
66        synchronizer: Arc<SynchronizerHandle>,
67        core_dispatcher: Arc<C>,
68        rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
69        transaction_certifier: TransactionCertifier,
70        dag_state: Arc<RwLock<DagState>>,
71        store: Arc<dyn Store>,
72    ) -> Self {
73        let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
74        Self {
75            context,
76            block_verifier,
77            commit_vote_monitor,
78            synchronizer,
79            core_dispatcher,
80            rx_block_broadcast,
81            subscription_counter,
82            transaction_certifier,
83            dag_state,
84            store,
85            round_tracker,
86        }
87    }
88
89    // Parses and validates serialized excluded ancestors.
90    fn parse_excluded_ancestors(
91        &self,
92        peer: AuthorityIndex,
93        block: &VerifiedBlock,
94        mut excluded_ancestors: Vec<Vec<u8>>,
95    ) -> ConsensusResult<Vec<BlockRef>> {
96        let peer_hostname = &self.context.committee.authority(peer).hostname;
97
98        let excluded_ancestors_limit = self.context.committee.size() * 2;
99        if excluded_ancestors.len() > excluded_ancestors_limit {
100            debug!(
101                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
102                excluded_ancestors.len() - excluded_ancestors_limit,
103                peer,
104                peer_hostname,
105            );
106            excluded_ancestors.truncate(excluded_ancestors_limit);
107        }
108
109        let excluded_ancestors = excluded_ancestors
110            .into_iter()
111            .map(|serialized| {
112                let block_ref: BlockRef =
113                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
114                if !self.context.committee.is_valid_index(block_ref.author) {
115                    return Err(ConsensusError::InvalidAuthorityIndex {
116                        index: block_ref.author,
117                        max: self.context.committee.size(),
118                    });
119                }
120                if block_ref.round >= block.round() {
121                    return Err(ConsensusError::InvalidAncestorRound {
122                        ancestor: block_ref.round,
123                        block: block.round(),
124                    });
125                }
126                Ok(block_ref)
127            })
128            .collect::<ConsensusResult<Vec<BlockRef>>>()?;
129
130        for excluded_ancestor in &excluded_ancestors {
131            let excluded_ancestor_hostname = &self
132                .context
133                .committee
134                .authority(excluded_ancestor.author)
135                .hostname;
136            self.context
137                .metrics
138                .node_metrics
139                .network_excluded_ancestors_count_by_authority
140                .with_label_values(&[excluded_ancestor_hostname])
141                .inc();
142        }
143        self.context
144            .metrics
145            .node_metrics
146            .network_received_excluded_ancestors_from_authority
147            .with_label_values(&[peer_hostname])
148            .inc_by(excluded_ancestors.len() as u64);
149
150        Ok(excluded_ancestors)
151    }
152}
153
154#[async_trait]
155impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
156    async fn handle_send_block(
157        &self,
158        peer: AuthorityIndex,
159        serialized_block: ExtendedSerializedBlock,
160    ) -> ConsensusResult<()> {
161        fail_point_async!("consensus-rpc-response");
162
163        let peer_hostname = &self.context.committee.authority(peer).hostname;
164
165        // TODO: dedup block verifications, here and with fetched blocks.
166        let signed_block: SignedBlock =
167            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
168
169        // Reject blocks not produced by the peer.
170        if peer != signed_block.author() {
171            self.context
172                .metrics
173                .node_metrics
174                .invalid_blocks
175                .with_label_values(&[
176                    peer_hostname.as_str(),
177                    "handle_send_block",
178                    "UnexpectedAuthority",
179                ])
180                .inc();
181            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
182            info!("Block with wrong authority from {}: {}", peer, e);
183            return Err(e);
184        }
185
186        // Reject blocks failing parsing and validations.
187        let (verified_block, reject_txn_votes) = self
188            .block_verifier
189            .verify_and_vote(signed_block, serialized_block.block)
190            .tap_err(|e| {
191                self.context
192                    .metrics
193                    .node_metrics
194                    .invalid_blocks
195                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
196                    .inc();
197                info!("Invalid block from {}: {}", peer, e);
198            })?;
199        let excluded_ancestors = self
200            .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
201            .tap_err(|e| {
202                debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
203                self.context
204                    .metrics
205                    .node_metrics
206                    .invalid_blocks
207                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
208                    .inc();
209            })?;
210
211        let block_ref = verified_block.reference();
212        debug!("Received block {} via send block.", block_ref);
213
214        self.context
215            .metrics
216            .node_metrics
217            .verified_blocks
218            .with_label_values(&[peer_hostname])
219            .inc();
220
221        let now = self.context.clock.timestamp_utc_ms();
222        let forward_time_drift =
223            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
224
225        self.context
226            .metrics
227            .node_metrics
228            .block_timestamp_drift_ms
229            .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
230            .inc_by(forward_time_drift.as_millis() as u64);
231
232        // Observe the block for the commit votes. When local commit is lagging too much,
233        // commit sync loop will trigger fetching.
234        self.commit_vote_monitor.observe_block(&verified_block);
235
236        // Update own received rounds and peer accepted rounds from this verified block.
237        self.round_tracker
238            .write()
239            .update_from_verified_block(&ExtendedBlock {
240                block: verified_block.clone(),
241                excluded_ancestors: excluded_ancestors.clone(),
242            });
243
244        // Reject blocks when local commit index is lagging too far from quorum commit index,
245        // to avoid the memory overhead from suspended blocks.
246        //
247        // IMPORTANT: this must be done after observing votes from the block, otherwise
248        // observed quorum commit will no longer progress.
249        //
250        // Since the main issue with too many suspended blocks is memory usage not CPU,
251        // it is ok to reject after block verifications instead of before.
252        let last_commit_index = self.dag_state.read().last_commit_index();
253        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
254        // The threshold to ignore block should be larger than commit_sync_batch_size,
255        // to avoid excessive block rejections and synchronizations.
256        if last_commit_index
257            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
258            < quorum_commit_index
259        {
260            self.context
261                .metrics
262                .node_metrics
263                .rejected_blocks
264                .with_label_values(&["commit_lagging"])
265                .inc();
266            debug!(
267                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
268                block_ref, last_commit_index, quorum_commit_index,
269            );
270            return Err(ConsensusError::BlockRejected {
271                block_ref,
272                reason: format!(
273                    "Last commit index is lagging quorum commit index too much ({} < {})",
274                    last_commit_index, quorum_commit_index,
275                ),
276            });
277        }
278
279        // The block is verified and current, so record own votes on the block
280        // before sending the block to Core.
281        if self.context.protocol_config.mysticeti_fastpath() {
282            self.transaction_certifier
283                .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
284        }
285
286        // Send the block to Core to try accepting it into the DAG.
287        let missing_ancestors = self
288            .core_dispatcher
289            .add_blocks(vec![verified_block.clone()])
290            .await
291            .map_err(|_| ConsensusError::Shutdown)?;
292
293        // Schedule fetching missing ancestors from this peer in the background.
294        if !missing_ancestors.is_empty() {
295            self.context
296                .metrics
297                .node_metrics
298                .handler_received_block_missing_ancestors
299                .with_label_values(&[peer_hostname])
300                .inc_by(missing_ancestors.len() as u64);
301            let synchronizer = self.synchronizer.clone();
302            spawn_monitored_task!(async move {
303                // This does not wait for the fetch request to complete.
304                // It only waits for synchronizer to queue the request to a peer.
305                // When this fails, it usually means the queue is full.
306                // The fetch will retry from other peers via live and periodic syncs.
307                if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
308                    debug!("Failed to fetch missing ancestors via synchronizer: {err}");
309                }
310            });
311        }
312
313        // Schedule fetching missing soft links from this peer in the background.
314        let missing_excluded_ancestors = self
315            .core_dispatcher
316            .check_block_refs(excluded_ancestors)
317            .await
318            .map_err(|_| ConsensusError::Shutdown)?;
319        if !missing_excluded_ancestors.is_empty() {
320            self.context
321                .metrics
322                .node_metrics
323                .network_excluded_ancestors_sent_to_fetch
324                .with_label_values(&[peer_hostname])
325                .inc_by(missing_excluded_ancestors.len() as u64);
326
327            let synchronizer = self.synchronizer.clone();
328            spawn_monitored_task!(async move {
329                if let Err(err) = synchronizer
330                    .fetch_blocks(missing_excluded_ancestors, peer)
331                    .await
332                {
333                    debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
334                }
335            });
336        }
337
338        Ok(())
339    }
340
341    async fn handle_subscribe_blocks(
342        &self,
343        peer: AuthorityIndex,
344        last_received: Round,
345    ) -> ConsensusResult<BlockStream> {
346        fail_point_async!("consensus-rpc-response");
347
348        // Find past proposed blocks as the initial blocks to send to the peer.
349        //
350        // If there are cached blocks in the range which the peer requested, send all of them.
351        // The size is bounded by the local GC round and DagState cache size.
352        //
353        // Otherwise if there is no cached block in the range which the peer requested,
354        // and this node has proposed blocks before, at least one block should be sent to the peer
355        // to help with liveness.
356        let past_proposed_blocks = {
357            let dag_state = self.dag_state.read();
358
359            let mut proposed_blocks =
360                dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
361            if proposed_blocks.is_empty() {
362                let last_proposed_block = dag_state.get_last_proposed_block();
363                proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
364                    vec![last_proposed_block]
365                } else {
366                    vec![]
367                };
368            }
369            stream::iter(
370                proposed_blocks
371                    .into_iter()
372                    .map(|block| ExtendedSerializedBlock {
373                        block: block.serialized().clone(),
374                        excluded_ancestors: vec![],
375                    }),
376            )
377        };
378
379        let broadcasted_blocks = BroadcastedBlockStream::new(
380            peer,
381            self.rx_block_broadcast.resubscribe(),
382            self.subscription_counter.clone(),
383        );
384
385        // Return a stream of blocks that first yields missed blocks as requested, then new blocks.
386        Ok(Box::pin(past_proposed_blocks.chain(
387            broadcasted_blocks.map(ExtendedSerializedBlock::from),
388        )))
389    }
390
391    // Handles two types of requests:
392    // 1. Missing block for block sync:
393    //    - uses highest_accepted_rounds.
394    //    - max_blocks_per_sync blocks should be returned.
395    // 2. Committed block for commit sync:
396    //    - does not use highest_accepted_rounds.
397    //    - max_blocks_per_fetch blocks should be returned.
398    async fn handle_fetch_blocks(
399        &self,
400        _peer: AuthorityIndex,
401        mut block_refs: Vec<BlockRef>,
402        highest_accepted_rounds: Vec<Round>,
403        breadth_first: bool,
404    ) -> ConsensusResult<Vec<Bytes>> {
405        fail_point_async!("consensus-rpc-response");
406
407        if !highest_accepted_rounds.is_empty()
408            && highest_accepted_rounds.len() != self.context.committee.size()
409        {
410            return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
411                highest_accepted_rounds.len(),
412                self.context.committee.size(),
413            ));
414        }
415
416        // Some quick validation of the requested block refs
417        let max_response_num_blocks = if !highest_accepted_rounds.is_empty() {
418            self.context.parameters.max_blocks_per_sync
419        } else {
420            self.context.parameters.max_blocks_per_fetch
421        };
422        if block_refs.len() > max_response_num_blocks {
423            block_refs.truncate(max_response_num_blocks);
424        }
425
426        // Validate the requested block refs.
427        for block in &block_refs {
428            if !self.context.committee.is_valid_index(block.author) {
429                return Err(ConsensusError::InvalidAuthorityIndex {
430                    index: block.author,
431                    max: self.context.committee.size(),
432                });
433            }
434            if block.round == GENESIS_ROUND {
435                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
436            }
437        }
438
439        // Get requested blocks from store.
440        let blocks = if !highest_accepted_rounds.is_empty() {
441            block_refs.sort();
442            block_refs.dedup();
443            let mut blocks = self
444                .dag_state
445                .read()
446                .get_blocks(&block_refs)
447                .into_iter()
448                .flatten()
449                .collect::<Vec<_>>();
450
451            if breadth_first {
452                // Get unique missing ancestor blocks of the requested blocks.
453                let mut missing_ancestors = blocks
454                    .iter()
455                    .flat_map(|block| block.ancestors().to_vec())
456                    .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
457                    .collect::<BTreeSet<_>>()
458                    .into_iter()
459                    .collect::<Vec<_>>();
460
461                // If there are too many missing ancestors, randomly select a subset to avoid
462                // fetching duplicated blocks across peers.
463                let selected_num_blocks = max_response_num_blocks.saturating_sub(blocks.len());
464                if selected_num_blocks < missing_ancestors.len() {
465                    missing_ancestors = missing_ancestors
466                        .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
467                        .copied()
468                        .collect::<Vec<_>>();
469                }
470                let ancestor_blocks = self.dag_state.read().get_blocks(&missing_ancestors);
471                blocks.extend(ancestor_blocks.into_iter().flatten());
472            } else {
473                // Get additional blocks from authorities with missing block, if they are available in cache.
474                // Compute the lowest missing round per requested authority.
475                let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
476                for block_ref in blocks.iter().map(|b| b.reference()) {
477                    let entry = lowest_missing_rounds
478                        .entry(block_ref.author)
479                        .or_insert(block_ref.round);
480                    *entry = (*entry).min(block_ref.round);
481                }
482
483                // Retrieve additional blocks per authority, from peer's highest accepted round + 1 to
484                // lowest missing round (exclusive) per requested authority.
485                // No block from other authorities are retrieved. It is possible that the requestor is not
486                // seeing missing block from another authority, and serving a block would just lead to unnecessary
487                // data transfer. Or missing blocks from other authorities are requested from other peers.
488                let dag_state = self.dag_state.read();
489                for (authority, lowest_missing_round) in lowest_missing_rounds {
490                    let highest_accepted_round = highest_accepted_rounds[authority];
491                    if highest_accepted_round >= lowest_missing_round {
492                        continue;
493                    }
494                    let missing_blocks = dag_state.get_cached_blocks_in_range(
495                        authority,
496                        highest_accepted_round + 1,
497                        lowest_missing_round,
498                        self.context
499                            .parameters
500                            .max_blocks_per_sync
501                            .saturating_sub(blocks.len()),
502                    );
503                    blocks.extend(missing_blocks);
504                    if blocks.len() >= self.context.parameters.max_blocks_per_sync {
505                        blocks.truncate(self.context.parameters.max_blocks_per_sync);
506                        break;
507                    }
508                }
509            }
510
511            blocks
512        } else {
513            self.dag_state
514                .read()
515                .get_blocks(&block_refs)
516                .into_iter()
517                .flatten()
518                .collect()
519        };
520
521        // Return the serialized blocks
522        let bytes = blocks
523            .into_iter()
524            .map(|block| block.serialized().clone())
525            .collect::<Vec<_>>();
526        Ok(bytes)
527    }
528
529    async fn handle_fetch_commits(
530        &self,
531        _peer: AuthorityIndex,
532        commit_range: CommitRange,
533    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
534        fail_point_async!("consensus-rpc-response");
535
536        // Compute an inclusive end index and bound the maximum number of commits scanned.
537        let inclusive_end = commit_range.end().min(
538            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
539                - 1,
540        );
541        let mut commits = self
542            .store
543            .scan_commits((commit_range.start()..=inclusive_end).into())?;
544        let mut certifier_block_refs = vec![];
545        'commit: while let Some(c) = commits.last() {
546            let index = c.index();
547            let votes = self.store.read_commit_votes(index)?;
548            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
549            for v in &votes {
550                stake_aggregator.add(v.author, &self.context.committee);
551            }
552            if stake_aggregator.reached_threshold(&self.context.committee) {
553                certifier_block_refs = votes;
554                break 'commit;
555            } else {
556                debug!(
557                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
558                    index,
559                    stake_aggregator.stake(),
560                    stake_aggregator.threshold(&self.context.committee)
561                );
562                self.context
563                    .metrics
564                    .node_metrics
565                    .commit_sync_fetch_commits_handler_uncertified_skipped
566                    .inc();
567                commits.pop();
568            }
569        }
570        let certifier_blocks = self
571            .store
572            .read_blocks(&certifier_block_refs)?
573            .into_iter()
574            .flatten()
575            .collect();
576        Ok((commits, certifier_blocks))
577    }
578
579    async fn handle_fetch_latest_blocks(
580        &self,
581        peer: AuthorityIndex,
582        authorities: Vec<AuthorityIndex>,
583    ) -> ConsensusResult<Vec<Bytes>> {
584        fail_point_async!("consensus-rpc-response");
585
586        if authorities.len() > self.context.committee.size() {
587            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
588        }
589
590        // Ensure that those are valid authorities
591        for authority in &authorities {
592            if !self.context.committee.is_valid_index(*authority) {
593                return Err(ConsensusError::InvalidAuthorityIndex {
594                    index: *authority,
595                    max: self.context.committee.size(),
596                });
597            }
598        }
599
600        // Read from the dag state to find the latest blocks.
601        // TODO: at the moment we don't look into the block manager for suspended blocks. Ideally we
602        // want in the future if we think we would like to tackle the majority of cases.
603        let mut blocks = vec![];
604        let dag_state = self.dag_state.read();
605        for authority in authorities {
606            let block = dag_state.get_last_block_for_authority(authority);
607
608            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
609
610            // no reason to serve back the genesis block - it's equal as if it has not received any block
611            if block.round() != GENESIS_ROUND {
612                blocks.push(block);
613            }
614        }
615
616        // Return the serialised blocks
617        let result = blocks
618            .into_iter()
619            .map(|block| block.serialized().clone())
620            .collect::<Vec<_>>();
621
622        Ok(result)
623    }
624
625    async fn handle_get_latest_rounds(
626        &self,
627        _peer: AuthorityIndex,
628    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
629        fail_point_async!("consensus-rpc-response");
630
631        let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
632
633        let blocks = self
634            .dag_state
635            .read()
636            .get_last_cached_block_per_authority(Round::MAX);
637        let highest_accepted_rounds = blocks
638            .into_iter()
639            .map(|(block, _)| block.round())
640            .collect::<Vec<_>>();
641
642        Ok((highest_received_rounds, highest_accepted_rounds))
643    }
644}
645
646struct Counter {
647    count: usize,
648    subscriptions_by_authority: Vec<usize>,
649}
650
651/// Atomically counts the number of active subscriptions to the block broadcast stream.
652struct SubscriptionCounter {
653    context: Arc<Context>,
654    counter: parking_lot::Mutex<Counter>,
655}
656
657impl SubscriptionCounter {
658    fn new(context: Arc<Context>) -> Self {
659        // Set the subscribed peers by default to 0
660        for (_, authority) in context.committee.authorities() {
661            context
662                .metrics
663                .node_metrics
664                .subscribed_by
665                .with_label_values(&[authority.hostname.as_str()])
666                .set(0);
667        }
668
669        Self {
670            counter: parking_lot::Mutex::new(Counter {
671                count: 0,
672                subscriptions_by_authority: vec![0; context.committee.size()],
673            }),
674            context,
675        }
676    }
677
678    fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
679        let mut counter = self.counter.lock();
680        counter.count += 1;
681        counter.subscriptions_by_authority[peer] += 1;
682
683        let peer_hostname = &self.context.committee.authority(peer).hostname;
684        self.context
685            .metrics
686            .node_metrics
687            .subscribed_by
688            .with_label_values(&[peer_hostname])
689            .set(1);
690
691        Ok(())
692    }
693
694    fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
695        let mut counter = self.counter.lock();
696        counter.count -= 1;
697        counter.subscriptions_by_authority[peer] -= 1;
698
699        if counter.subscriptions_by_authority[peer] == 0 {
700            let peer_hostname = &self.context.committee.authority(peer).hostname;
701            self.context
702                .metrics
703                .node_metrics
704                .subscribed_by
705                .with_label_values(&[peer_hostname])
706                .set(0);
707        }
708
709        Ok(())
710    }
711}
712
713/// Each broadcasted block stream wraps a broadcast receiver for blocks.
714/// It yields blocks that are broadcasted after the stream is created.
715type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
716
717/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
718/// this tolerates lags with only logging, without yielding errors.
719struct BroadcastStream<T> {
720    peer: AuthorityIndex,
721    // Stores the receiver across poll_next() calls.
722    inner: ReusableBoxFuture<
723        'static,
724        (
725            Result<T, broadcast::error::RecvError>,
726            broadcast::Receiver<T>,
727        ),
728    >,
729    // Counts total subscriptions / active BroadcastStreams.
730    subscription_counter: Arc<SubscriptionCounter>,
731}
732
733impl<T: 'static + Clone + Send> BroadcastStream<T> {
734    pub fn new(
735        peer: AuthorityIndex,
736        rx: broadcast::Receiver<T>,
737        subscription_counter: Arc<SubscriptionCounter>,
738    ) -> Self {
739        if let Err(err) = subscription_counter.increment(peer) {
740            match err {
741                ConsensusError::Shutdown => {}
742                _ => panic!("Unexpected error: {err}"),
743            }
744        }
745        Self {
746            peer,
747            inner: ReusableBoxFuture::new(make_recv_future(rx)),
748            subscription_counter,
749        }
750    }
751}
752
753impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
754    type Item = T;
755
756    fn poll_next(
757        mut self: Pin<&mut Self>,
758        cx: &mut task::Context<'_>,
759    ) -> task::Poll<Option<Self::Item>> {
760        let peer = self.peer;
761        let maybe_item = loop {
762            let (result, rx) = ready!(self.inner.poll(cx));
763            self.inner.set(make_recv_future(rx));
764
765            match result {
766                Ok(item) => break Some(item),
767                Err(broadcast::error::RecvError::Closed) => {
768                    info!("Block BroadcastedBlockStream {} closed", peer);
769                    break None;
770                }
771                Err(broadcast::error::RecvError::Lagged(n)) => {
772                    warn!(
773                        "Block BroadcastedBlockStream {} lagged by {} messages",
774                        peer, n
775                    );
776                    continue;
777                }
778            }
779        };
780        task::Poll::Ready(maybe_item)
781    }
782}
783
784impl<T> Drop for BroadcastStream<T> {
785    fn drop(&mut self) {
786        if let Err(err) = self.subscription_counter.decrement(self.peer) {
787            match err {
788                ConsensusError::Shutdown => {}
789                _ => panic!("Unexpected error: {err}"),
790            }
791        }
792    }
793}
794
795async fn make_recv_future<T: Clone>(
796    mut rx: broadcast::Receiver<T>,
797) -> (
798    Result<T, broadcast::error::RecvError>,
799    broadcast::Receiver<T>,
800) {
801    let result = rx.recv().await;
802    (result, rx)
803}
804
805// TODO: add a unit test for BroadcastStream.
806
807#[cfg(test)]
808mod tests {
809    use std::{
810        collections::{BTreeMap, BTreeSet},
811        sync::Arc,
812        time::Duration,
813    };
814
815    use async_trait::async_trait;
816    use bytes::Bytes;
817    use consensus_config::AuthorityIndex;
818    use consensus_types::block::{BlockDigest, BlockRef, Round};
819    use mysten_metrics::monitored_mpsc;
820    use parking_lot::{Mutex, RwLock};
821    use tokio::{sync::broadcast, time::sleep};
822
823    use futures::StreamExt as _;
824
825    use crate::{
826        authority_service::AuthorityService,
827        block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
828        commit::{CertifiedCommits, CommitRange},
829        commit_vote_monitor::CommitVoteMonitor,
830        context::Context,
831        core_thread::{CoreError, CoreThreadDispatcher},
832        dag_state::DagState,
833        error::ConsensusResult,
834        network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
835        round_tracker::RoundTracker,
836        storage::mem_store::MemStore,
837        synchronizer::Synchronizer,
838        test_dag_builder::DagBuilder,
839        transaction_certifier::TransactionCertifier,
840    };
841    struct FakeCoreThreadDispatcher {
842        blocks: Mutex<Vec<VerifiedBlock>>,
843    }
844
845    impl FakeCoreThreadDispatcher {
846        fn new() -> Self {
847            Self {
848                blocks: Mutex::new(vec![]),
849            }
850        }
851
852        fn get_blocks(&self) -> Vec<VerifiedBlock> {
853            self.blocks.lock().clone()
854        }
855    }
856
857    #[async_trait]
858    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
859        async fn add_blocks(
860            &self,
861            blocks: Vec<VerifiedBlock>,
862        ) -> Result<BTreeSet<BlockRef>, CoreError> {
863            let block_refs = blocks.iter().map(|b| b.reference()).collect();
864            self.blocks.lock().extend(blocks);
865            Ok(block_refs)
866        }
867
868        async fn check_block_refs(
869            &self,
870            _block_refs: Vec<BlockRef>,
871        ) -> Result<BTreeSet<BlockRef>, CoreError> {
872            Ok(BTreeSet::new())
873        }
874
875        async fn add_certified_commits(
876            &self,
877            _commits: CertifiedCommits,
878        ) -> Result<BTreeSet<BlockRef>, CoreError> {
879            todo!()
880        }
881
882        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
883            Ok(())
884        }
885
886        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
887            Ok(Default::default())
888        }
889
890        fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
891            todo!()
892        }
893
894        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
895            todo!()
896        }
897    }
898
899    #[derive(Default)]
900    struct FakeNetworkClient {}
901
902    #[async_trait]
903    impl NetworkClient for FakeNetworkClient {
904        async fn send_block(
905            &self,
906            _peer: AuthorityIndex,
907            _block: &VerifiedBlock,
908            _timeout: Duration,
909        ) -> ConsensusResult<()> {
910            unimplemented!("Unimplemented")
911        }
912
913        async fn subscribe_blocks(
914            &self,
915            _peer: AuthorityIndex,
916            _last_received: Round,
917            _timeout: Duration,
918        ) -> ConsensusResult<BlockStream> {
919            unimplemented!("Unimplemented")
920        }
921
922        async fn fetch_blocks(
923            &self,
924            _peer: AuthorityIndex,
925            _block_refs: Vec<BlockRef>,
926            _highest_accepted_rounds: Vec<Round>,
927            _breadth_first: bool,
928            _timeout: Duration,
929        ) -> ConsensusResult<Vec<Bytes>> {
930            unimplemented!("Unimplemented")
931        }
932
933        async fn fetch_commits(
934            &self,
935            _peer: AuthorityIndex,
936            _commit_range: CommitRange,
937            _timeout: Duration,
938        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
939            unimplemented!("Unimplemented")
940        }
941
942        async fn fetch_latest_blocks(
943            &self,
944            _peer: AuthorityIndex,
945            _authorities: Vec<AuthorityIndex>,
946            _timeout: Duration,
947        ) -> ConsensusResult<Vec<Bytes>> {
948            unimplemented!("Unimplemented")
949        }
950
951        async fn get_latest_rounds(
952            &self,
953            _peer: AuthorityIndex,
954            _timeout: Duration,
955        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
956            unimplemented!("Unimplemented")
957        }
958    }
959
960    #[tokio::test(flavor = "current_thread", start_paused = true)]
961    async fn test_handle_send_block() {
962        let (context, _keys) = Context::new_for_test(4);
963        let context = Arc::new(context);
964        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
965        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
966        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
967        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
968        let network_client = Arc::new(FakeNetworkClient::default());
969        let (blocks_sender, _blocks_receiver) =
970            monitored_mpsc::unbounded_channel("consensus_block_output");
971        let store = Arc::new(MemStore::new());
972        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
973        let transaction_certifier = TransactionCertifier::new(
974            context.clone(),
975            block_verifier.clone(),
976            dag_state.clone(),
977            blocks_sender,
978        );
979        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
980        let synchronizer = Synchronizer::start(
981            network_client,
982            context.clone(),
983            core_dispatcher.clone(),
984            commit_vote_monitor.clone(),
985            block_verifier.clone(),
986            transaction_certifier.clone(),
987            round_tracker.clone(),
988            dag_state.clone(),
989            false,
990        );
991        let authority_service = Arc::new(AuthorityService::new(
992            context.clone(),
993            block_verifier,
994            commit_vote_monitor,
995            round_tracker,
996            synchronizer,
997            core_dispatcher.clone(),
998            rx_block_broadcast,
999            transaction_certifier,
1000            dag_state,
1001            store,
1002        ));
1003
1004        // Test delaying blocks with time drift.
1005        let now = context.clock.timestamp_utc_ms();
1006        let max_drift = context.parameters.max_forward_time_drift;
1007        let input_block = VerifiedBlock::new_for_test(
1008            TestBlock::new(9, 0)
1009                .set_timestamp_ms(now + max_drift.as_millis() as u64)
1010                .build(),
1011        );
1012
1013        let service = authority_service.clone();
1014        let serialized = ExtendedSerializedBlock {
1015            block: input_block.serialized().clone(),
1016            excluded_ancestors: vec![],
1017        };
1018
1019        tokio::spawn({
1020            let service = service.clone();
1021            let context = context.clone();
1022            async move {
1023                service
1024                    .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1025                    .await
1026                    .unwrap();
1027            }
1028        });
1029
1030        sleep(max_drift / 2).await;
1031
1032        let blocks = core_dispatcher.get_blocks();
1033        assert_eq!(blocks.len(), 1);
1034        assert_eq!(blocks[0], input_block);
1035
1036        // Test invalid block.
1037        let invalid_block =
1038            VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1039        let extended_block = ExtendedSerializedBlock {
1040            block: invalid_block.serialized().clone(),
1041            excluded_ancestors: vec![],
1042        };
1043        service
1044            .handle_send_block(
1045                context.committee.to_authority_index(0).unwrap(),
1046                extended_block,
1047            )
1048            .await
1049            .unwrap_err();
1050
1051        // Test invalid excluded ancestors.
1052        let invalid_excluded_ancestors = vec![
1053            bcs::to_bytes(&BlockRef::new(
1054                10,
1055                AuthorityIndex::new_for_test(1000),
1056                BlockDigest::MIN,
1057            ))
1058            .unwrap(),
1059            vec![3u8; 40],
1060            bcs::to_bytes(&invalid_block.reference()).unwrap(),
1061        ];
1062        let extended_block = ExtendedSerializedBlock {
1063            block: input_block.serialized().clone(),
1064            excluded_ancestors: invalid_excluded_ancestors,
1065        };
1066        service
1067            .handle_send_block(
1068                context.committee.to_authority_index(0).unwrap(),
1069                extended_block,
1070            )
1071            .await
1072            .unwrap_err();
1073    }
1074
1075    #[tokio::test(flavor = "current_thread", start_paused = true)]
1076    async fn test_handle_fetch_blocks() {
1077        // GIVEN
1078        // Use NUM_AUTHORITIES and NUM_ROUNDS higher than max_blocks_per_sync to test limits.
1079        const NUM_AUTHORITIES: usize = 40;
1080        const NUM_ROUNDS: usize = 40;
1081        let (context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1082        let context = Arc::new(context);
1083        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1084        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1085        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1086        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1087        let network_client = Arc::new(FakeNetworkClient::default());
1088        let (blocks_sender, _blocks_receiver) =
1089            monitored_mpsc::unbounded_channel("consensus_block_output");
1090        let store = Arc::new(MemStore::new());
1091        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1092        let transaction_certifier = TransactionCertifier::new(
1093            context.clone(),
1094            block_verifier.clone(),
1095            dag_state.clone(),
1096            blocks_sender,
1097        );
1098        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1099        let synchronizer = Synchronizer::start(
1100            network_client,
1101            context.clone(),
1102            core_dispatcher.clone(),
1103            commit_vote_monitor.clone(),
1104            block_verifier.clone(),
1105            transaction_certifier.clone(),
1106            round_tracker.clone(),
1107            dag_state.clone(),
1108            false,
1109        );
1110        let authority_service = Arc::new(AuthorityService::new(
1111            context.clone(),
1112            block_verifier,
1113            commit_vote_monitor,
1114            round_tracker,
1115            synchronizer,
1116            core_dispatcher.clone(),
1117            rx_block_broadcast,
1118            transaction_certifier,
1119            dag_state.clone(),
1120            store,
1121        ));
1122
1123        // GIVEN: 40 rounds of blocks in the dag state.
1124        let mut dag_builder = DagBuilder::new(context.clone());
1125        dag_builder
1126            .layers(1..=(NUM_ROUNDS as u32))
1127            .build()
1128            .persist_layers(dag_state.clone());
1129        let all_blocks = dag_builder.all_blocks();
1130
1131        // WHEN: Request 2 blocks from round 40, get ancestors breadth first.
1132        let missing_block_refs: Vec<BlockRef> = all_blocks
1133            .iter()
1134            .rev()
1135            .take(2)
1136            .map(|b| b.reference())
1137            .collect();
1138        let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1139        let results = authority_service
1140            .handle_fetch_blocks(
1141                AuthorityIndex::new_for_test(0),
1142                missing_block_refs.clone(),
1143                highest_accepted_rounds,
1144                true,
1145            )
1146            .await
1147            .unwrap();
1148
1149        // THEN: the expected number of unique blocks are returned.
1150        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1151            .iter()
1152            .map(|b| {
1153                let signed = bcs::from_bytes(b).unwrap();
1154                let block = VerifiedBlock::new_verified(signed, b.clone());
1155                (block.reference(), block)
1156            })
1157            .collect();
1158        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1159        // All missing blocks are returned.
1160        for b in &missing_block_refs {
1161            assert!(blocks.contains_key(b));
1162        }
1163        let num_missing_ancestors = blocks
1164            .keys()
1165            .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1166            .count();
1167        assert_eq!(
1168            num_missing_ancestors,
1169            context.parameters.max_blocks_per_sync - missing_block_refs.len()
1170        );
1171
1172        // WHEN: Request 2 blocks from round 37, get ancestors depth first.
1173        let missing_round = NUM_ROUNDS as Round - 3;
1174        let missing_block_refs: Vec<BlockRef> = all_blocks
1175            .iter()
1176            .filter(|b| b.reference().round == missing_round)
1177            .map(|b| b.reference())
1178            .take(2)
1179            .collect();
1180        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1181        // Try to fill up the blocks from the 1st authority in missing_block_refs.
1182        highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1183        let results = authority_service
1184            .handle_fetch_blocks(
1185                AuthorityIndex::new_for_test(0),
1186                missing_block_refs.clone(),
1187                highest_accepted_rounds,
1188                false,
1189            )
1190            .await
1191            .unwrap();
1192
1193        // THEN: the expected number of unique blocks are returned.
1194        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1195            .iter()
1196            .map(|b| {
1197                let signed = bcs::from_bytes(b).unwrap();
1198                let block = VerifiedBlock::new_verified(signed, b.clone());
1199                (block.reference(), block)
1200            })
1201            .collect();
1202        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1203        // All missing blocks are returned.
1204        for b in &missing_block_refs {
1205            assert!(blocks.contains_key(b));
1206        }
1207        // Ancestor blocks are from the expected rounds and authorities.
1208        let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1209        for b in blocks.keys() {
1210            assert!(b.round <= missing_round);
1211            assert!(expected_authors.contains(&b.author));
1212        }
1213
1214        // WHEN: Request 5 block from round 40, not getting ancestors.
1215        let missing_block_refs: Vec<BlockRef> = all_blocks
1216            .iter()
1217            .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1218            .map(|b| b.reference())
1219            .take(5)
1220            .collect();
1221        let results = authority_service
1222            .handle_fetch_blocks(
1223                AuthorityIndex::new_for_test(0),
1224                missing_block_refs.clone(),
1225                vec![],
1226                false,
1227            )
1228            .await
1229            .unwrap();
1230
1231        // THEN: the expected number of unique blocks are returned.
1232        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1233            .iter()
1234            .map(|b| {
1235                let signed = bcs::from_bytes(b).unwrap();
1236                let block = VerifiedBlock::new_verified(signed, b.clone());
1237                (block.reference(), block)
1238            })
1239            .collect();
1240        assert_eq!(blocks.len(), 5);
1241        for b in &missing_block_refs {
1242            assert!(blocks.contains_key(b));
1243        }
1244    }
1245
1246    #[tokio::test(flavor = "current_thread", start_paused = true)]
1247    async fn test_handle_fetch_latest_blocks() {
1248        // GIVEN
1249        let (context, _keys) = Context::new_for_test(4);
1250        let context = Arc::new(context);
1251        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1252        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1253        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1254        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1255        let network_client = Arc::new(FakeNetworkClient::default());
1256        let (blocks_sender, _blocks_receiver) =
1257            monitored_mpsc::unbounded_channel("consensus_block_output");
1258        let store = Arc::new(MemStore::new());
1259        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1260        let transaction_certifier = TransactionCertifier::new(
1261            context.clone(),
1262            block_verifier.clone(),
1263            dag_state.clone(),
1264            blocks_sender,
1265        );
1266        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1267        let synchronizer = Synchronizer::start(
1268            network_client,
1269            context.clone(),
1270            core_dispatcher.clone(),
1271            commit_vote_monitor.clone(),
1272            block_verifier.clone(),
1273            transaction_certifier.clone(),
1274            round_tracker.clone(),
1275            dag_state.clone(),
1276            true,
1277        );
1278        let authority_service = Arc::new(AuthorityService::new(
1279            context.clone(),
1280            block_verifier,
1281            commit_vote_monitor,
1282            round_tracker,
1283            synchronizer,
1284            core_dispatcher.clone(),
1285            rx_block_broadcast,
1286            transaction_certifier,
1287            dag_state.clone(),
1288            store,
1289        ));
1290
1291        // Create some blocks for a few authorities. Create some equivocations as well and store in dag state.
1292        let mut dag_builder = DagBuilder::new(context.clone());
1293        dag_builder
1294            .layers(1..=10)
1295            .authorities(vec![AuthorityIndex::new_for_test(2)])
1296            .equivocate(1)
1297            .build()
1298            .persist_layers(dag_state);
1299
1300        // WHEN
1301        let authorities_to_request = vec![
1302            AuthorityIndex::new_for_test(1),
1303            AuthorityIndex::new_for_test(2),
1304        ];
1305        let results = authority_service
1306            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1307            .await;
1308
1309        // THEN
1310        let serialised_blocks = results.unwrap();
1311        for serialised_block in serialised_blocks {
1312            let signed_block: SignedBlock =
1313                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1314            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1315
1316            assert_eq!(verified_block.round(), 10);
1317        }
1318    }
1319
1320    #[tokio::test(flavor = "current_thread", start_paused = true)]
1321    async fn test_handle_subscribe_blocks() {
1322        let (context, _keys) = Context::new_for_test(4);
1323        let context = Arc::new(context);
1324        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1325        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1326        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1327        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1328        let network_client = Arc::new(FakeNetworkClient::default());
1329        let (blocks_sender, _blocks_receiver) =
1330            monitored_mpsc::unbounded_channel("consensus_block_output");
1331        let store = Arc::new(MemStore::new());
1332        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1333        let transaction_certifier = TransactionCertifier::new(
1334            context.clone(),
1335            block_verifier.clone(),
1336            dag_state.clone(),
1337            blocks_sender,
1338        );
1339        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1340        let synchronizer = Synchronizer::start(
1341            network_client,
1342            context.clone(),
1343            core_dispatcher.clone(),
1344            commit_vote_monitor.clone(),
1345            block_verifier.clone(),
1346            transaction_certifier.clone(),
1347            round_tracker.clone(),
1348            dag_state.clone(),
1349            false,
1350        );
1351
1352        // Create 3 proposed blocks at rounds 5, 10, 15 for own authority (index 0)
1353        dag_state
1354            .write()
1355            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1356        dag_state
1357            .write()
1358            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1359        dag_state
1360            .write()
1361            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1362
1363        let authority_service = Arc::new(AuthorityService::new(
1364            context.clone(),
1365            block_verifier,
1366            commit_vote_monitor,
1367            round_tracker,
1368            synchronizer,
1369            core_dispatcher.clone(),
1370            rx_block_broadcast,
1371            transaction_certifier,
1372            dag_state.clone(),
1373            store,
1374        ));
1375
1376        let peer = context.committee.to_authority_index(1).unwrap();
1377
1378        // Case A: Subscribe with last_received = 100 (after all proposed blocks)
1379        // Should return last proposed block (round 15) as fallback
1380        {
1381            let mut stream = authority_service
1382                .handle_subscribe_blocks(peer, 100)
1383                .await
1384                .unwrap();
1385            let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1386            assert_eq!(
1387                block.round(),
1388                15,
1389                "Should return last proposed block as fallback"
1390            );
1391            assert_eq!(block.author().value(), 0);
1392        }
1393
1394        // Case B: Subscribe with last_received = 7 (includes rounds 10, 15)
1395        // Should return cached blocks from round 8+
1396        {
1397            let mut stream = authority_service
1398                .handle_subscribe_blocks(peer, 7)
1399                .await
1400                .unwrap();
1401
1402            let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1403            assert_eq!(block1.round(), 10, "Should return block at round 10");
1404
1405            let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1406            assert_eq!(block2.round(), 15, "Should return block at round 15");
1407        }
1408    }
1409
1410    #[tokio::test(flavor = "current_thread", start_paused = true)]
1411    async fn test_handle_subscribe_blocks_not_proposed() {
1412        let (context, _keys) = Context::new_for_test(4);
1413        let context = Arc::new(context);
1414        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1415        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1416        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1417        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1418        let network_client = Arc::new(FakeNetworkClient::default());
1419        let (blocks_sender, _blocks_receiver) =
1420            monitored_mpsc::unbounded_channel("consensus_block_output");
1421        let store = Arc::new(MemStore::new());
1422        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1423        let transaction_certifier = TransactionCertifier::new(
1424            context.clone(),
1425            block_verifier.clone(),
1426            dag_state.clone(),
1427            blocks_sender,
1428        );
1429        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1430        let synchronizer = Synchronizer::start(
1431            network_client,
1432            context.clone(),
1433            core_dispatcher.clone(),
1434            commit_vote_monitor.clone(),
1435            block_verifier.clone(),
1436            transaction_certifier.clone(),
1437            round_tracker.clone(),
1438            dag_state.clone(),
1439            false,
1440        );
1441
1442        // No blocks added to DagState - only genesis exists
1443
1444        let authority_service = Arc::new(AuthorityService::new(
1445            context.clone(),
1446            block_verifier,
1447            commit_vote_monitor,
1448            round_tracker,
1449            synchronizer,
1450            core_dispatcher.clone(),
1451            rx_block_broadcast,
1452            transaction_certifier,
1453            dag_state.clone(),
1454            store,
1455        ));
1456
1457        let peer = context.committee.to_authority_index(1).unwrap();
1458
1459        // Subscribe - no blocks have been proposed yet (only genesis exists)
1460        let mut stream = authority_service
1461            .handle_subscribe_blocks(peer, 0)
1462            .await
1463            .unwrap();
1464
1465        // Should NOT receive any block (genesis must not be returned)
1466        use futures::poll;
1467        use std::task::Poll;
1468        let poll_result = poll!(stream.next());
1469        assert!(
1470            matches!(poll_result, Poll::Pending),
1471            "Should not receive genesis block on subscription stream"
1472        );
1473    }
1474}