consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    pin::Pin,
7    sync::Arc,
8    time::Duration,
9};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use consensus_types::block::{BlockRef, Round};
15use futures::{Stream, StreamExt, ready, stream, task};
16use mysten_metrics::spawn_monitored_task;
17use parking_lot::RwLock;
18use rand::seq::SliceRandom as _;
19use sui_macros::fail_point_async;
20use tap::TapFallible;
21use tokio::sync::broadcast;
22use tokio_util::sync::ReusableBoxFuture;
23use tracing::{debug, info, warn};
24
25use crate::{
26    CommitIndex,
27    block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
28    block_verifier::BlockVerifier,
29    commit::{CommitAPI as _, CommitRange, TrustedCommit},
30    commit_vote_monitor::CommitVoteMonitor,
31    context::Context,
32    core_thread::CoreThreadDispatcher,
33    dag_state::DagState,
34    error::{ConsensusError, ConsensusResult},
35    network::{BlockStream, ExtendedSerializedBlock, NetworkService},
36    round_tracker::RoundTracker,
37    stake_aggregator::{QuorumThreshold, StakeAggregator},
38    storage::Store,
39    synchronizer::SynchronizerHandle,
40    transaction_certifier::TransactionCertifier,
41};
42
43pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
44
45/// Authority's network service implementation, agnostic to the actual networking stack used.
46pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
47    context: Arc<Context>,
48    commit_vote_monitor: Arc<CommitVoteMonitor>,
49    block_verifier: Arc<dyn BlockVerifier>,
50    synchronizer: Arc<SynchronizerHandle>,
51    core_dispatcher: Arc<C>,
52    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
53    subscription_counter: Arc<SubscriptionCounter>,
54    transaction_certifier: TransactionCertifier,
55    dag_state: Arc<RwLock<DagState>>,
56    store: Arc<dyn Store>,
57    round_tracker: Arc<RwLock<RoundTracker>>,
58}
59
60impl<C: CoreThreadDispatcher> AuthorityService<C> {
61    pub(crate) fn new(
62        context: Arc<Context>,
63        block_verifier: Arc<dyn BlockVerifier>,
64        commit_vote_monitor: Arc<CommitVoteMonitor>,
65        round_tracker: Arc<RwLock<RoundTracker>>,
66        synchronizer: Arc<SynchronizerHandle>,
67        core_dispatcher: Arc<C>,
68        rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
69        transaction_certifier: TransactionCertifier,
70        dag_state: Arc<RwLock<DagState>>,
71        store: Arc<dyn Store>,
72    ) -> Self {
73        let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
74        Self {
75            context,
76            block_verifier,
77            commit_vote_monitor,
78            synchronizer,
79            core_dispatcher,
80            rx_block_broadcast,
81            subscription_counter,
82            transaction_certifier,
83            dag_state,
84            store,
85            round_tracker,
86        }
87    }
88
89    // Parses and validates serialized excluded ancestors.
90    fn parse_excluded_ancestors(
91        &self,
92        peer: AuthorityIndex,
93        block: &VerifiedBlock,
94        mut excluded_ancestors: Vec<Vec<u8>>,
95    ) -> ConsensusResult<Vec<BlockRef>> {
96        let peer_hostname = &self.context.committee.authority(peer).hostname;
97
98        let excluded_ancestors_limit = self.context.committee.size() * 2;
99        if excluded_ancestors.len() > excluded_ancestors_limit {
100            debug!(
101                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
102                excluded_ancestors.len() - excluded_ancestors_limit,
103                peer,
104                peer_hostname,
105            );
106            excluded_ancestors.truncate(excluded_ancestors_limit);
107        }
108
109        let excluded_ancestors = excluded_ancestors
110            .into_iter()
111            .map(|serialized| {
112                let block_ref: BlockRef =
113                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
114                if !self.context.committee.is_valid_index(block_ref.author) {
115                    return Err(ConsensusError::InvalidAuthorityIndex {
116                        index: block_ref.author,
117                        max: self.context.committee.size(),
118                    });
119                }
120                if block_ref.round >= block.round() {
121                    return Err(ConsensusError::InvalidAncestorRound {
122                        ancestor: block_ref.round,
123                        block: block.round(),
124                    });
125                }
126                Ok(block_ref)
127            })
128            .collect::<ConsensusResult<Vec<BlockRef>>>()?;
129
130        for excluded_ancestor in &excluded_ancestors {
131            let excluded_ancestor_hostname = &self
132                .context
133                .committee
134                .authority(excluded_ancestor.author)
135                .hostname;
136            self.context
137                .metrics
138                .node_metrics
139                .network_excluded_ancestors_count_by_authority
140                .with_label_values(&[excluded_ancestor_hostname])
141                .inc();
142        }
143        self.context
144            .metrics
145            .node_metrics
146            .network_received_excluded_ancestors_from_authority
147            .with_label_values(&[peer_hostname])
148            .inc_by(excluded_ancestors.len() as u64);
149
150        Ok(excluded_ancestors)
151    }
152}
153
154#[async_trait]
155impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
156    async fn handle_send_block(
157        &self,
158        peer: AuthorityIndex,
159        serialized_block: ExtendedSerializedBlock,
160    ) -> ConsensusResult<()> {
161        fail_point_async!("consensus-rpc-response");
162
163        let peer_hostname = &self.context.committee.authority(peer).hostname;
164
165        // TODO: dedup block verifications, here and with fetched blocks.
166        let signed_block: SignedBlock =
167            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
168
169        // Reject blocks not produced by the peer.
170        if peer != signed_block.author() {
171            self.context
172                .metrics
173                .node_metrics
174                .invalid_blocks
175                .with_label_values(&[peer_hostname, "handle_send_block", "UnexpectedAuthority"])
176                .inc();
177            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
178            info!("Block with wrong authority from {}: {}", peer, e);
179            return Err(e);
180        }
181
182        // Reject blocks failing parsing and validations.
183        let (verified_block, reject_txn_votes) = self
184            .block_verifier
185            .verify_and_vote(signed_block, serialized_block.block)
186            .tap_err(|e| {
187                self.context
188                    .metrics
189                    .node_metrics
190                    .invalid_blocks
191                    .with_label_values(&[peer_hostname, "handle_send_block", e.name()])
192                    .inc();
193                info!("Invalid block from {}: {}", peer, e);
194            })?;
195        let excluded_ancestors = self
196            .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
197            .tap_err(|e| {
198                debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
199                self.context
200                    .metrics
201                    .node_metrics
202                    .invalid_blocks
203                    .with_label_values(&[peer_hostname, "handle_send_block", e.name()])
204                    .inc();
205            })?;
206
207        let block_ref = verified_block.reference();
208        debug!("Received block {} via send block.", block_ref);
209
210        self.context
211            .metrics
212            .node_metrics
213            .verified_blocks
214            .with_label_values(&[peer_hostname])
215            .inc();
216
217        let now = self.context.clock.timestamp_utc_ms();
218        let forward_time_drift =
219            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
220
221        self.context
222            .metrics
223            .node_metrics
224            .block_timestamp_drift_ms
225            .with_label_values(&[peer_hostname, "handle_send_block"])
226            .inc_by(forward_time_drift.as_millis() as u64);
227
228        // Observe the block for the commit votes. When local commit is lagging too much,
229        // commit sync loop will trigger fetching.
230        self.commit_vote_monitor.observe_block(&verified_block);
231
232        // Update own received rounds and peer accepted rounds from this verified block.
233        self.round_tracker
234            .write()
235            .update_from_verified_block(&ExtendedBlock {
236                block: verified_block.clone(),
237                excluded_ancestors: excluded_ancestors.clone(),
238            });
239
240        // Reject blocks when local commit index is lagging too far from quorum commit index,
241        // to avoid the memory overhead from suspended blocks.
242        //
243        // IMPORTANT: this must be done after observing votes from the block, otherwise
244        // observed quorum commit will no longer progress.
245        //
246        // Since the main issue with too many suspended blocks is memory usage not CPU,
247        // it is ok to reject after block verifications instead of before.
248        let last_commit_index = self.dag_state.read().last_commit_index();
249        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
250        // The threshold to ignore block should be larger than commit_sync_batch_size,
251        // to avoid excessive block rejections and synchronizations.
252        if last_commit_index
253            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
254            < quorum_commit_index
255        {
256            self.context
257                .metrics
258                .node_metrics
259                .rejected_blocks
260                .with_label_values(&["commit_lagging"])
261                .inc();
262            debug!(
263                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
264                block_ref, last_commit_index, quorum_commit_index,
265            );
266            return Err(ConsensusError::BlockRejected {
267                block_ref,
268                reason: format!(
269                    "Last commit index is lagging quorum commit index too much ({} < {})",
270                    last_commit_index, quorum_commit_index,
271                ),
272            });
273        }
274
275        // The block is verified and current, so record own votes on the block
276        // before sending the block to Core.
277        if self.context.protocol_config.mysticeti_fastpath() {
278            self.transaction_certifier
279                .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
280        }
281
282        // Send the block to Core to try accepting it into the DAG.
283        let missing_ancestors = self
284            .core_dispatcher
285            .add_blocks(vec![verified_block.clone()])
286            .await
287            .map_err(|_| ConsensusError::Shutdown)?;
288
289        // Schedule fetching missing ancestors from this peer in the background.
290        if !missing_ancestors.is_empty() {
291            self.context
292                .metrics
293                .node_metrics
294                .handler_received_block_missing_ancestors
295                .with_label_values(&[peer_hostname])
296                .inc_by(missing_ancestors.len() as u64);
297            let synchronizer = self.synchronizer.clone();
298            spawn_monitored_task!(async move {
299                // This does not wait for the fetch request to complete.
300                // It only waits for synchronizer to queue the request to a peer.
301                // When this fails, it usually means the queue is full.
302                // The fetch will retry from other peers via live and periodic syncs.
303                if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
304                    debug!("Failed to fetch missing ancestors via synchronizer: {err}");
305                }
306            });
307        }
308
309        // Schedule fetching missing soft links from this peer in the background.
310        let missing_excluded_ancestors = self
311            .core_dispatcher
312            .check_block_refs(excluded_ancestors)
313            .await
314            .map_err(|_| ConsensusError::Shutdown)?;
315        if !missing_excluded_ancestors.is_empty() {
316            self.context
317                .metrics
318                .node_metrics
319                .network_excluded_ancestors_sent_to_fetch
320                .with_label_values(&[peer_hostname])
321                .inc_by(missing_excluded_ancestors.len() as u64);
322
323            let synchronizer = self.synchronizer.clone();
324            spawn_monitored_task!(async move {
325                if let Err(err) = synchronizer
326                    .fetch_blocks(missing_excluded_ancestors, peer)
327                    .await
328                {
329                    debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
330                }
331            });
332        }
333
334        Ok(())
335    }
336
337    async fn handle_subscribe_blocks(
338        &self,
339        peer: AuthorityIndex,
340        last_received: Round,
341    ) -> ConsensusResult<BlockStream> {
342        fail_point_async!("consensus-rpc-response");
343
344        // Find past proposed blocks as the initial blocks to send to the peer.
345        //
346        // If there are cached blocks in the range which the peer requested, send all of them.
347        // The size is bounded by the local GC round and DagState cache size.
348        //
349        // Otherwise if there is no cached block in the range which the peer requested,
350        // and this node has proposed blocks before, at least one block should be sent to the peer
351        // to help with liveness.
352        let past_proposed_blocks = {
353            let dag_state = self.dag_state.read();
354
355            let mut proposed_blocks =
356                dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
357            if proposed_blocks.is_empty() {
358                let last_proposed_block = dag_state.get_last_proposed_block();
359                proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
360                    vec![last_proposed_block]
361                } else {
362                    vec![]
363                };
364            }
365            stream::iter(
366                proposed_blocks
367                    .into_iter()
368                    .map(|block| ExtendedSerializedBlock {
369                        block: block.serialized().clone(),
370                        excluded_ancestors: vec![],
371                    }),
372            )
373        };
374
375        let broadcasted_blocks = BroadcastedBlockStream::new(
376            peer,
377            self.rx_block_broadcast.resubscribe(),
378            self.subscription_counter.clone(),
379        );
380
381        // Return a stream of blocks that first yields missed blocks as requested, then new blocks.
382        Ok(Box::pin(past_proposed_blocks.chain(
383            broadcasted_blocks.map(ExtendedSerializedBlock::from),
384        )))
385    }
386
387    // Handles two types of requests:
388    // 1. Missing block for block sync:
389    //    - uses highest_accepted_rounds.
390    //    - max_blocks_per_sync blocks should be returned.
391    // 2. Committed block for commit sync:
392    //    - does not use highest_accepted_rounds.
393    //    - max_blocks_per_fetch blocks should be returned.
394    async fn handle_fetch_blocks(
395        &self,
396        _peer: AuthorityIndex,
397        mut block_refs: Vec<BlockRef>,
398        highest_accepted_rounds: Vec<Round>,
399        breadth_first: bool,
400    ) -> ConsensusResult<Vec<Bytes>> {
401        fail_point_async!("consensus-rpc-response");
402
403        if !highest_accepted_rounds.is_empty()
404            && highest_accepted_rounds.len() != self.context.committee.size()
405        {
406            return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
407                highest_accepted_rounds.len(),
408                self.context.committee.size(),
409            ));
410        }
411
412        // Some quick validation of the requested block refs
413        let max_response_num_blocks = if !highest_accepted_rounds.is_empty() {
414            self.context.parameters.max_blocks_per_sync
415        } else {
416            self.context.parameters.max_blocks_per_fetch
417        };
418        if block_refs.len() > max_response_num_blocks {
419            block_refs.truncate(max_response_num_blocks);
420        }
421
422        // Validate the requested block refs.
423        for block in &block_refs {
424            if !self.context.committee.is_valid_index(block.author) {
425                return Err(ConsensusError::InvalidAuthorityIndex {
426                    index: block.author,
427                    max: self.context.committee.size(),
428                });
429            }
430            if block.round == GENESIS_ROUND {
431                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
432            }
433        }
434
435        // Get requested blocks from store.
436        let blocks = if !highest_accepted_rounds.is_empty() {
437            block_refs.sort();
438            block_refs.dedup();
439            let mut blocks = self
440                .dag_state
441                .read()
442                .get_blocks(&block_refs)
443                .into_iter()
444                .flatten()
445                .collect::<Vec<_>>();
446
447            if breadth_first {
448                // Get unique missing ancestor blocks of the requested blocks.
449                let mut missing_ancestors = blocks
450                    .iter()
451                    .flat_map(|block| block.ancestors().to_vec())
452                    .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
453                    .collect::<BTreeSet<_>>()
454                    .into_iter()
455                    .collect::<Vec<_>>();
456
457                // If there are too many missing ancestors, randomly select a subset to avoid
458                // fetching duplicated blocks across peers.
459                let selected_num_blocks = max_response_num_blocks.saturating_sub(blocks.len());
460                if selected_num_blocks < missing_ancestors.len() {
461                    missing_ancestors = missing_ancestors
462                        .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
463                        .copied()
464                        .collect::<Vec<_>>();
465                }
466                let ancestor_blocks = self.dag_state.read().get_blocks(&missing_ancestors);
467                blocks.extend(ancestor_blocks.into_iter().flatten());
468            } else {
469                // Get additional blocks from authorities with missing block, if they are available in cache.
470                // Compute the lowest missing round per requested authority.
471                let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
472                for block_ref in blocks.iter().map(|b| b.reference()) {
473                    let entry = lowest_missing_rounds
474                        .entry(block_ref.author)
475                        .or_insert(block_ref.round);
476                    *entry = (*entry).min(block_ref.round);
477                }
478
479                // Retrieve additional blocks per authority, from peer's highest accepted round + 1 to
480                // lowest missing round (exclusive) per requested authority.
481                // No block from other authorities are retrieved. It is possible that the requestor is not
482                // seeing missing block from another authority, and serving a block would just lead to unnecessary
483                // data transfer. Or missing blocks from other authorities are requested from other peers.
484                let dag_state = self.dag_state.read();
485                for (authority, lowest_missing_round) in lowest_missing_rounds {
486                    let highest_accepted_round = highest_accepted_rounds[authority];
487                    if highest_accepted_round >= lowest_missing_round {
488                        continue;
489                    }
490                    let missing_blocks = dag_state.get_cached_blocks_in_range(
491                        authority,
492                        highest_accepted_round + 1,
493                        lowest_missing_round,
494                        self.context
495                            .parameters
496                            .max_blocks_per_sync
497                            .saturating_sub(blocks.len()),
498                    );
499                    blocks.extend(missing_blocks);
500                    if blocks.len() >= self.context.parameters.max_blocks_per_sync {
501                        blocks.truncate(self.context.parameters.max_blocks_per_sync);
502                        break;
503                    }
504                }
505            }
506
507            blocks
508        } else {
509            self.dag_state
510                .read()
511                .get_blocks(&block_refs)
512                .into_iter()
513                .flatten()
514                .collect()
515        };
516
517        // Return the serialized blocks
518        let bytes = blocks
519            .into_iter()
520            .map(|block| block.serialized().clone())
521            .collect::<Vec<_>>();
522        Ok(bytes)
523    }
524
525    async fn handle_fetch_commits(
526        &self,
527        _peer: AuthorityIndex,
528        commit_range: CommitRange,
529    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
530        fail_point_async!("consensus-rpc-response");
531
532        // Compute an inclusive end index and bound the maximum number of commits scanned.
533        let inclusive_end = commit_range.end().min(
534            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
535                - 1,
536        );
537        let mut commits = self
538            .store
539            .scan_commits((commit_range.start()..=inclusive_end).into())?;
540        let mut certifier_block_refs = vec![];
541        'commit: while let Some(c) = commits.last() {
542            let index = c.index();
543            let votes = self.store.read_commit_votes(index)?;
544            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
545            for v in &votes {
546                stake_aggregator.add(v.author, &self.context.committee);
547            }
548            if stake_aggregator.reached_threshold(&self.context.committee) {
549                certifier_block_refs = votes;
550                break 'commit;
551            } else {
552                debug!(
553                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
554                    index,
555                    stake_aggregator.stake(),
556                    stake_aggregator.threshold(&self.context.committee)
557                );
558                self.context
559                    .metrics
560                    .node_metrics
561                    .commit_sync_fetch_commits_handler_uncertified_skipped
562                    .inc();
563                commits.pop();
564            }
565        }
566        let certifier_blocks = self
567            .store
568            .read_blocks(&certifier_block_refs)?
569            .into_iter()
570            .flatten()
571            .collect();
572        Ok((commits, certifier_blocks))
573    }
574
575    async fn handle_fetch_latest_blocks(
576        &self,
577        peer: AuthorityIndex,
578        authorities: Vec<AuthorityIndex>,
579    ) -> ConsensusResult<Vec<Bytes>> {
580        fail_point_async!("consensus-rpc-response");
581
582        if authorities.len() > self.context.committee.size() {
583            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
584        }
585
586        // Ensure that those are valid authorities
587        for authority in &authorities {
588            if !self.context.committee.is_valid_index(*authority) {
589                return Err(ConsensusError::InvalidAuthorityIndex {
590                    index: *authority,
591                    max: self.context.committee.size(),
592                });
593            }
594        }
595
596        // Read from the dag state to find the latest blocks.
597        // TODO: at the moment we don't look into the block manager for suspended blocks. Ideally we
598        // want in the future if we think we would like to tackle the majority of cases.
599        let mut blocks = vec![];
600        let dag_state = self.dag_state.read();
601        for authority in authorities {
602            let block = dag_state.get_last_block_for_authority(authority);
603
604            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
605
606            // no reason to serve back the genesis block - it's equal as if it has not received any block
607            if block.round() != GENESIS_ROUND {
608                blocks.push(block);
609            }
610        }
611
612        // Return the serialised blocks
613        let result = blocks
614            .into_iter()
615            .map(|block| block.serialized().clone())
616            .collect::<Vec<_>>();
617
618        Ok(result)
619    }
620
621    async fn handle_get_latest_rounds(
622        &self,
623        _peer: AuthorityIndex,
624    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
625        fail_point_async!("consensus-rpc-response");
626
627        let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
628
629        let blocks = self
630            .dag_state
631            .read()
632            .get_last_cached_block_per_authority(Round::MAX);
633        let highest_accepted_rounds = blocks
634            .into_iter()
635            .map(|(block, _)| block.round())
636            .collect::<Vec<_>>();
637
638        Ok((highest_received_rounds, highest_accepted_rounds))
639    }
640}
641
642struct Counter {
643    count: usize,
644    subscriptions_by_authority: Vec<usize>,
645}
646
647/// Atomically counts the number of active subscriptions to the block broadcast stream.
648struct SubscriptionCounter {
649    context: Arc<Context>,
650    counter: parking_lot::Mutex<Counter>,
651}
652
653impl SubscriptionCounter {
654    fn new(context: Arc<Context>) -> Self {
655        // Set the subscribed peers by default to 0
656        for (_, authority) in context.committee.authorities() {
657            context
658                .metrics
659                .node_metrics
660                .subscribed_by
661                .with_label_values(&[authority.hostname.as_str()])
662                .set(0);
663        }
664
665        Self {
666            counter: parking_lot::Mutex::new(Counter {
667                count: 0,
668                subscriptions_by_authority: vec![0; context.committee.size()],
669            }),
670            context,
671        }
672    }
673
674    fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
675        let mut counter = self.counter.lock();
676        counter.count += 1;
677        counter.subscriptions_by_authority[peer] += 1;
678
679        let peer_hostname = &self.context.committee.authority(peer).hostname;
680        self.context
681            .metrics
682            .node_metrics
683            .subscribed_by
684            .with_label_values(&[peer_hostname])
685            .set(1);
686
687        Ok(())
688    }
689
690    fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
691        let mut counter = self.counter.lock();
692        counter.count -= 1;
693        counter.subscriptions_by_authority[peer] -= 1;
694
695        if counter.subscriptions_by_authority[peer] == 0 {
696            let peer_hostname = &self.context.committee.authority(peer).hostname;
697            self.context
698                .metrics
699                .node_metrics
700                .subscribed_by
701                .with_label_values(&[peer_hostname])
702                .set(0);
703        }
704
705        Ok(())
706    }
707}
708
709/// Each broadcasted block stream wraps a broadcast receiver for blocks.
710/// It yields blocks that are broadcasted after the stream is created.
711type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
712
713/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
714/// this tolerates lags with only logging, without yielding errors.
715struct BroadcastStream<T> {
716    peer: AuthorityIndex,
717    // Stores the receiver across poll_next() calls.
718    inner: ReusableBoxFuture<
719        'static,
720        (
721            Result<T, broadcast::error::RecvError>,
722            broadcast::Receiver<T>,
723        ),
724    >,
725    // Counts total subscriptions / active BroadcastStreams.
726    subscription_counter: Arc<SubscriptionCounter>,
727}
728
729impl<T: 'static + Clone + Send> BroadcastStream<T> {
730    pub fn new(
731        peer: AuthorityIndex,
732        rx: broadcast::Receiver<T>,
733        subscription_counter: Arc<SubscriptionCounter>,
734    ) -> Self {
735        if let Err(err) = subscription_counter.increment(peer) {
736            match err {
737                ConsensusError::Shutdown => {}
738                _ => panic!("Unexpected error: {err}"),
739            }
740        }
741        Self {
742            peer,
743            inner: ReusableBoxFuture::new(make_recv_future(rx)),
744            subscription_counter,
745        }
746    }
747}
748
749impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
750    type Item = T;
751
752    fn poll_next(
753        mut self: Pin<&mut Self>,
754        cx: &mut task::Context<'_>,
755    ) -> task::Poll<Option<Self::Item>> {
756        let peer = self.peer;
757        let maybe_item = loop {
758            let (result, rx) = ready!(self.inner.poll(cx));
759            self.inner.set(make_recv_future(rx));
760
761            match result {
762                Ok(item) => break Some(item),
763                Err(broadcast::error::RecvError::Closed) => {
764                    info!("Block BroadcastedBlockStream {} closed", peer);
765                    break None;
766                }
767                Err(broadcast::error::RecvError::Lagged(n)) => {
768                    warn!(
769                        "Block BroadcastedBlockStream {} lagged by {} messages",
770                        peer, n
771                    );
772                    continue;
773                }
774            }
775        };
776        task::Poll::Ready(maybe_item)
777    }
778}
779
780impl<T> Drop for BroadcastStream<T> {
781    fn drop(&mut self) {
782        if let Err(err) = self.subscription_counter.decrement(self.peer) {
783            match err {
784                ConsensusError::Shutdown => {}
785                _ => panic!("Unexpected error: {err}"),
786            }
787        }
788    }
789}
790
791async fn make_recv_future<T: Clone>(
792    mut rx: broadcast::Receiver<T>,
793) -> (
794    Result<T, broadcast::error::RecvError>,
795    broadcast::Receiver<T>,
796) {
797    let result = rx.recv().await;
798    (result, rx)
799}
800
801// TODO: add a unit test for BroadcastStream.
802
803#[cfg(test)]
804mod tests {
805    use std::{
806        collections::{BTreeMap, BTreeSet},
807        sync::Arc,
808        time::Duration,
809    };
810
811    use async_trait::async_trait;
812    use bytes::Bytes;
813    use consensus_config::AuthorityIndex;
814    use consensus_types::block::{BlockDigest, BlockRef, Round};
815    use mysten_metrics::monitored_mpsc;
816    use parking_lot::{Mutex, RwLock};
817    use tokio::{sync::broadcast, time::sleep};
818
819    use futures::StreamExt as _;
820
821    use crate::{
822        authority_service::AuthorityService,
823        block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
824        commit::{CertifiedCommits, CommitRange},
825        commit_vote_monitor::CommitVoteMonitor,
826        context::Context,
827        core_thread::{CoreError, CoreThreadDispatcher},
828        dag_state::DagState,
829        error::ConsensusResult,
830        network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
831        round_tracker::RoundTracker,
832        storage::mem_store::MemStore,
833        synchronizer::Synchronizer,
834        test_dag_builder::DagBuilder,
835        transaction_certifier::TransactionCertifier,
836    };
837    struct FakeCoreThreadDispatcher {
838        blocks: Mutex<Vec<VerifiedBlock>>,
839    }
840
841    impl FakeCoreThreadDispatcher {
842        fn new() -> Self {
843            Self {
844                blocks: Mutex::new(vec![]),
845            }
846        }
847
848        fn get_blocks(&self) -> Vec<VerifiedBlock> {
849            self.blocks.lock().clone()
850        }
851    }
852
853    #[async_trait]
854    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
855        async fn add_blocks(
856            &self,
857            blocks: Vec<VerifiedBlock>,
858        ) -> Result<BTreeSet<BlockRef>, CoreError> {
859            let block_refs = blocks.iter().map(|b| b.reference()).collect();
860            self.blocks.lock().extend(blocks);
861            Ok(block_refs)
862        }
863
864        async fn check_block_refs(
865            &self,
866            _block_refs: Vec<BlockRef>,
867        ) -> Result<BTreeSet<BlockRef>, CoreError> {
868            Ok(BTreeSet::new())
869        }
870
871        async fn add_certified_commits(
872            &self,
873            _commits: CertifiedCommits,
874        ) -> Result<BTreeSet<BlockRef>, CoreError> {
875            todo!()
876        }
877
878        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
879            Ok(())
880        }
881
882        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
883            Ok(Default::default())
884        }
885
886        fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
887            todo!()
888        }
889
890        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
891            todo!()
892        }
893    }
894
895    #[derive(Default)]
896    struct FakeNetworkClient {}
897
898    #[async_trait]
899    impl NetworkClient for FakeNetworkClient {
900        async fn send_block(
901            &self,
902            _peer: AuthorityIndex,
903            _block: &VerifiedBlock,
904            _timeout: Duration,
905        ) -> ConsensusResult<()> {
906            unimplemented!("Unimplemented")
907        }
908
909        async fn subscribe_blocks(
910            &self,
911            _peer: AuthorityIndex,
912            _last_received: Round,
913            _timeout: Duration,
914        ) -> ConsensusResult<BlockStream> {
915            unimplemented!("Unimplemented")
916        }
917
918        async fn fetch_blocks(
919            &self,
920            _peer: AuthorityIndex,
921            _block_refs: Vec<BlockRef>,
922            _highest_accepted_rounds: Vec<Round>,
923            _breadth_first: bool,
924            _timeout: Duration,
925        ) -> ConsensusResult<Vec<Bytes>> {
926            unimplemented!("Unimplemented")
927        }
928
929        async fn fetch_commits(
930            &self,
931            _peer: AuthorityIndex,
932            _commit_range: CommitRange,
933            _timeout: Duration,
934        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
935            unimplemented!("Unimplemented")
936        }
937
938        async fn fetch_latest_blocks(
939            &self,
940            _peer: AuthorityIndex,
941            _authorities: Vec<AuthorityIndex>,
942            _timeout: Duration,
943        ) -> ConsensusResult<Vec<Bytes>> {
944            unimplemented!("Unimplemented")
945        }
946
947        async fn get_latest_rounds(
948            &self,
949            _peer: AuthorityIndex,
950            _timeout: Duration,
951        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
952            unimplemented!("Unimplemented")
953        }
954    }
955
956    #[tokio::test(flavor = "current_thread", start_paused = true)]
957    async fn test_handle_send_block() {
958        let (context, _keys) = Context::new_for_test(4);
959        let context = Arc::new(context);
960        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
961        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
962        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
963        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
964        let network_client = Arc::new(FakeNetworkClient::default());
965        let (blocks_sender, _blocks_receiver) =
966            monitored_mpsc::unbounded_channel("consensus_block_output");
967        let store = Arc::new(MemStore::new());
968        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
969        let transaction_certifier = TransactionCertifier::new(
970            context.clone(),
971            block_verifier.clone(),
972            dag_state.clone(),
973            blocks_sender,
974        );
975        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
976        let synchronizer = Synchronizer::start(
977            network_client,
978            context.clone(),
979            core_dispatcher.clone(),
980            commit_vote_monitor.clone(),
981            block_verifier.clone(),
982            transaction_certifier.clone(),
983            round_tracker.clone(),
984            dag_state.clone(),
985            false,
986        );
987        let authority_service = Arc::new(AuthorityService::new(
988            context.clone(),
989            block_verifier,
990            commit_vote_monitor,
991            round_tracker,
992            synchronizer,
993            core_dispatcher.clone(),
994            rx_block_broadcast,
995            transaction_certifier,
996            dag_state,
997            store,
998        ));
999
1000        // Test delaying blocks with time drift.
1001        let now = context.clock.timestamp_utc_ms();
1002        let max_drift = context.parameters.max_forward_time_drift;
1003        let input_block = VerifiedBlock::new_for_test(
1004            TestBlock::new(9, 0)
1005                .set_timestamp_ms(now + max_drift.as_millis() as u64)
1006                .build(),
1007        );
1008
1009        let service = authority_service.clone();
1010        let serialized = ExtendedSerializedBlock {
1011            block: input_block.serialized().clone(),
1012            excluded_ancestors: vec![],
1013        };
1014
1015        tokio::spawn({
1016            let service = service.clone();
1017            let context = context.clone();
1018            async move {
1019                service
1020                    .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1021                    .await
1022                    .unwrap();
1023            }
1024        });
1025
1026        sleep(max_drift / 2).await;
1027
1028        let blocks = core_dispatcher.get_blocks();
1029        assert_eq!(blocks.len(), 1);
1030        assert_eq!(blocks[0], input_block);
1031
1032        // Test invalid block.
1033        let invalid_block =
1034            VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1035        let extended_block = ExtendedSerializedBlock {
1036            block: invalid_block.serialized().clone(),
1037            excluded_ancestors: vec![],
1038        };
1039        service
1040            .handle_send_block(
1041                context.committee.to_authority_index(0).unwrap(),
1042                extended_block,
1043            )
1044            .await
1045            .unwrap_err();
1046
1047        // Test invalid excluded ancestors.
1048        let invalid_excluded_ancestors = vec![
1049            bcs::to_bytes(&BlockRef::new(
1050                10,
1051                AuthorityIndex::new_for_test(1000),
1052                BlockDigest::MIN,
1053            ))
1054            .unwrap(),
1055            vec![3u8; 40],
1056            bcs::to_bytes(&invalid_block.reference()).unwrap(),
1057        ];
1058        let extended_block = ExtendedSerializedBlock {
1059            block: input_block.serialized().clone(),
1060            excluded_ancestors: invalid_excluded_ancestors,
1061        };
1062        service
1063            .handle_send_block(
1064                context.committee.to_authority_index(0).unwrap(),
1065                extended_block,
1066            )
1067            .await
1068            .unwrap_err();
1069    }
1070
1071    #[tokio::test(flavor = "current_thread", start_paused = true)]
1072    async fn test_handle_fetch_blocks() {
1073        // GIVEN
1074        // Use NUM_AUTHORITIES and NUM_ROUNDS higher than max_blocks_per_sync to test limits.
1075        const NUM_AUTHORITIES: usize = 40;
1076        const NUM_ROUNDS: usize = 40;
1077        let (context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1078        let context = Arc::new(context);
1079        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1080        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1081        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1082        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1083        let network_client = Arc::new(FakeNetworkClient::default());
1084        let (blocks_sender, _blocks_receiver) =
1085            monitored_mpsc::unbounded_channel("consensus_block_output");
1086        let store = Arc::new(MemStore::new());
1087        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1088        let transaction_certifier = TransactionCertifier::new(
1089            context.clone(),
1090            block_verifier.clone(),
1091            dag_state.clone(),
1092            blocks_sender,
1093        );
1094        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1095        let synchronizer = Synchronizer::start(
1096            network_client,
1097            context.clone(),
1098            core_dispatcher.clone(),
1099            commit_vote_monitor.clone(),
1100            block_verifier.clone(),
1101            transaction_certifier.clone(),
1102            round_tracker.clone(),
1103            dag_state.clone(),
1104            false,
1105        );
1106        let authority_service = Arc::new(AuthorityService::new(
1107            context.clone(),
1108            block_verifier,
1109            commit_vote_monitor,
1110            round_tracker,
1111            synchronizer,
1112            core_dispatcher.clone(),
1113            rx_block_broadcast,
1114            transaction_certifier,
1115            dag_state.clone(),
1116            store,
1117        ));
1118
1119        // GIVEN: 40 rounds of blocks in the dag state.
1120        let mut dag_builder = DagBuilder::new(context.clone());
1121        dag_builder
1122            .layers(1..=(NUM_ROUNDS as u32))
1123            .build()
1124            .persist_layers(dag_state.clone());
1125        let all_blocks = dag_builder.all_blocks();
1126
1127        // WHEN: Request 2 blocks from round 40, get ancestors breadth first.
1128        let missing_block_refs: Vec<BlockRef> = all_blocks
1129            .iter()
1130            .rev()
1131            .take(2)
1132            .map(|b| b.reference())
1133            .collect();
1134        let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1135        let results = authority_service
1136            .handle_fetch_blocks(
1137                AuthorityIndex::new_for_test(0),
1138                missing_block_refs.clone(),
1139                highest_accepted_rounds,
1140                true,
1141            )
1142            .await
1143            .unwrap();
1144
1145        // THEN: the expected number of unique blocks are returned.
1146        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1147            .iter()
1148            .map(|b| {
1149                let signed = bcs::from_bytes(b).unwrap();
1150                let block = VerifiedBlock::new_verified(signed, b.clone());
1151                (block.reference(), block)
1152            })
1153            .collect();
1154        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1155        // All missing blocks are returned.
1156        for b in &missing_block_refs {
1157            assert!(blocks.contains_key(b));
1158        }
1159        let num_missing_ancestors = blocks
1160            .keys()
1161            .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1162            .count();
1163        assert_eq!(
1164            num_missing_ancestors,
1165            context.parameters.max_blocks_per_sync - missing_block_refs.len()
1166        );
1167
1168        // WHEN: Request 2 blocks from round 37, get ancestors depth first.
1169        let missing_round = NUM_ROUNDS as Round - 3;
1170        let missing_block_refs: Vec<BlockRef> = all_blocks
1171            .iter()
1172            .filter(|b| b.reference().round == missing_round)
1173            .map(|b| b.reference())
1174            .take(2)
1175            .collect();
1176        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1177        // Try to fill up the blocks from the 1st authority in missing_block_refs.
1178        highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1179        let results = authority_service
1180            .handle_fetch_blocks(
1181                AuthorityIndex::new_for_test(0),
1182                missing_block_refs.clone(),
1183                highest_accepted_rounds,
1184                false,
1185            )
1186            .await
1187            .unwrap();
1188
1189        // THEN: the expected number of unique blocks are returned.
1190        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1191            .iter()
1192            .map(|b| {
1193                let signed = bcs::from_bytes(b).unwrap();
1194                let block = VerifiedBlock::new_verified(signed, b.clone());
1195                (block.reference(), block)
1196            })
1197            .collect();
1198        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1199        // All missing blocks are returned.
1200        for b in &missing_block_refs {
1201            assert!(blocks.contains_key(b));
1202        }
1203        // Ancestor blocks are from the expected rounds and authorities.
1204        let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1205        for b in blocks.keys() {
1206            assert!(b.round <= missing_round);
1207            assert!(expected_authors.contains(&b.author));
1208        }
1209
1210        // WHEN: Request 5 block from round 40, not getting ancestors.
1211        let missing_block_refs: Vec<BlockRef> = all_blocks
1212            .iter()
1213            .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1214            .map(|b| b.reference())
1215            .take(5)
1216            .collect();
1217        let results = authority_service
1218            .handle_fetch_blocks(
1219                AuthorityIndex::new_for_test(0),
1220                missing_block_refs.clone(),
1221                vec![],
1222                false,
1223            )
1224            .await
1225            .unwrap();
1226
1227        // THEN: the expected number of unique blocks are returned.
1228        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1229            .iter()
1230            .map(|b| {
1231                let signed = bcs::from_bytes(b).unwrap();
1232                let block = VerifiedBlock::new_verified(signed, b.clone());
1233                (block.reference(), block)
1234            })
1235            .collect();
1236        assert_eq!(blocks.len(), 5);
1237        for b in &missing_block_refs {
1238            assert!(blocks.contains_key(b));
1239        }
1240    }
1241
1242    #[tokio::test(flavor = "current_thread", start_paused = true)]
1243    async fn test_handle_fetch_latest_blocks() {
1244        // GIVEN
1245        let (context, _keys) = Context::new_for_test(4);
1246        let context = Arc::new(context);
1247        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1248        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1249        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1250        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1251        let network_client = Arc::new(FakeNetworkClient::default());
1252        let (blocks_sender, _blocks_receiver) =
1253            monitored_mpsc::unbounded_channel("consensus_block_output");
1254        let store = Arc::new(MemStore::new());
1255        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1256        let transaction_certifier = TransactionCertifier::new(
1257            context.clone(),
1258            block_verifier.clone(),
1259            dag_state.clone(),
1260            blocks_sender,
1261        );
1262        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1263        let synchronizer = Synchronizer::start(
1264            network_client,
1265            context.clone(),
1266            core_dispatcher.clone(),
1267            commit_vote_monitor.clone(),
1268            block_verifier.clone(),
1269            transaction_certifier.clone(),
1270            round_tracker.clone(),
1271            dag_state.clone(),
1272            true,
1273        );
1274        let authority_service = Arc::new(AuthorityService::new(
1275            context.clone(),
1276            block_verifier,
1277            commit_vote_monitor,
1278            round_tracker,
1279            synchronizer,
1280            core_dispatcher.clone(),
1281            rx_block_broadcast,
1282            transaction_certifier,
1283            dag_state.clone(),
1284            store,
1285        ));
1286
1287        // Create some blocks for a few authorities. Create some equivocations as well and store in dag state.
1288        let mut dag_builder = DagBuilder::new(context.clone());
1289        dag_builder
1290            .layers(1..=10)
1291            .authorities(vec![AuthorityIndex::new_for_test(2)])
1292            .equivocate(1)
1293            .build()
1294            .persist_layers(dag_state);
1295
1296        // WHEN
1297        let authorities_to_request = vec![
1298            AuthorityIndex::new_for_test(1),
1299            AuthorityIndex::new_for_test(2),
1300        ];
1301        let results = authority_service
1302            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1303            .await;
1304
1305        // THEN
1306        let serialised_blocks = results.unwrap();
1307        for serialised_block in serialised_blocks {
1308            let signed_block: SignedBlock =
1309                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1310            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1311
1312            assert_eq!(verified_block.round(), 10);
1313        }
1314    }
1315
1316    #[tokio::test(flavor = "current_thread", start_paused = true)]
1317    async fn test_handle_subscribe_blocks() {
1318        let (context, _keys) = Context::new_for_test(4);
1319        let context = Arc::new(context);
1320        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1321        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1322        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1323        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1324        let network_client = Arc::new(FakeNetworkClient::default());
1325        let (blocks_sender, _blocks_receiver) =
1326            monitored_mpsc::unbounded_channel("consensus_block_output");
1327        let store = Arc::new(MemStore::new());
1328        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1329        let transaction_certifier = TransactionCertifier::new(
1330            context.clone(),
1331            block_verifier.clone(),
1332            dag_state.clone(),
1333            blocks_sender,
1334        );
1335        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1336        let synchronizer = Synchronizer::start(
1337            network_client,
1338            context.clone(),
1339            core_dispatcher.clone(),
1340            commit_vote_monitor.clone(),
1341            block_verifier.clone(),
1342            transaction_certifier.clone(),
1343            round_tracker.clone(),
1344            dag_state.clone(),
1345            false,
1346        );
1347
1348        // Create 3 proposed blocks at rounds 5, 10, 15 for own authority (index 0)
1349        dag_state
1350            .write()
1351            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1352        dag_state
1353            .write()
1354            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1355        dag_state
1356            .write()
1357            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1358
1359        let authority_service = Arc::new(AuthorityService::new(
1360            context.clone(),
1361            block_verifier,
1362            commit_vote_monitor,
1363            round_tracker,
1364            synchronizer,
1365            core_dispatcher.clone(),
1366            rx_block_broadcast,
1367            transaction_certifier,
1368            dag_state.clone(),
1369            store,
1370        ));
1371
1372        let peer = context.committee.to_authority_index(1).unwrap();
1373
1374        // Case A: Subscribe with last_received = 100 (after all proposed blocks)
1375        // Should return last proposed block (round 15) as fallback
1376        {
1377            let mut stream = authority_service
1378                .handle_subscribe_blocks(peer, 100)
1379                .await
1380                .unwrap();
1381            let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1382            assert_eq!(
1383                block.round(),
1384                15,
1385                "Should return last proposed block as fallback"
1386            );
1387            assert_eq!(block.author().value(), 0);
1388        }
1389
1390        // Case B: Subscribe with last_received = 7 (includes rounds 10, 15)
1391        // Should return cached blocks from round 8+
1392        {
1393            let mut stream = authority_service
1394                .handle_subscribe_blocks(peer, 7)
1395                .await
1396                .unwrap();
1397
1398            let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1399            assert_eq!(block1.round(), 10, "Should return block at round 10");
1400
1401            let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1402            assert_eq!(block2.round(), 15, "Should return block at round 15");
1403        }
1404    }
1405
1406    #[tokio::test(flavor = "current_thread", start_paused = true)]
1407    async fn test_handle_subscribe_blocks_not_proposed() {
1408        let (context, _keys) = Context::new_for_test(4);
1409        let context = Arc::new(context);
1410        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1411        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1412        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1413        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1414        let network_client = Arc::new(FakeNetworkClient::default());
1415        let (blocks_sender, _blocks_receiver) =
1416            monitored_mpsc::unbounded_channel("consensus_block_output");
1417        let store = Arc::new(MemStore::new());
1418        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1419        let transaction_certifier = TransactionCertifier::new(
1420            context.clone(),
1421            block_verifier.clone(),
1422            dag_state.clone(),
1423            blocks_sender,
1424        );
1425        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1426        let synchronizer = Synchronizer::start(
1427            network_client,
1428            context.clone(),
1429            core_dispatcher.clone(),
1430            commit_vote_monitor.clone(),
1431            block_verifier.clone(),
1432            transaction_certifier.clone(),
1433            round_tracker.clone(),
1434            dag_state.clone(),
1435            false,
1436        );
1437
1438        // No blocks added to DagState - only genesis exists
1439
1440        let authority_service = Arc::new(AuthorityService::new(
1441            context.clone(),
1442            block_verifier,
1443            commit_vote_monitor,
1444            round_tracker,
1445            synchronizer,
1446            core_dispatcher.clone(),
1447            rx_block_broadcast,
1448            transaction_certifier,
1449            dag_state.clone(),
1450            store,
1451        ));
1452
1453        let peer = context.committee.to_authority_index(1).unwrap();
1454
1455        // Subscribe - no blocks have been proposed yet (only genesis exists)
1456        let mut stream = authority_service
1457            .handle_subscribe_blocks(peer, 0)
1458            .await
1459            .unwrap();
1460
1461        // Should NOT receive any block (genesis must not be returned)
1462        use futures::poll;
1463        use std::task::Poll;
1464        let poll_result = poll!(stream.next());
1465        assert!(
1466            matches!(poll_result, Poll::Pending),
1467            "Should not receive genesis block on subscription stream"
1468        );
1469    }
1470}