consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use consensus_config::AuthorityIndex;
9use consensus_types::block::{BlockRef, Round};
10use futures::{Stream, StreamExt, ready, stream, task};
11use mysten_metrics::spawn_monitored_task;
12use parking_lot::RwLock;
13use sui_macros::fail_point_async;
14use tap::TapFallible;
15use tokio::sync::broadcast;
16use tokio_util::sync::ReusableBoxFuture;
17use tracing::{debug, info, warn};
18
19use crate::{
20    block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
21    block_sync_service::BlockSyncService,
22    block_verifier::BlockVerifier,
23    commit::{CommitRange, TrustedCommit},
24    commit_vote_monitor::{CommitVoteMonitor, is_commit_lagging},
25    context::Context,
26    core_thread::CoreThreadDispatcher,
27    dag_state::DagState,
28    error::{ConsensusError, ConsensusResult},
29    network::{BlockStream, ExtendedSerializedBlock, PeerId, ValidatorNetworkService},
30    round_tracker::RoundTracker,
31    synchronizer::SynchronizerHandle,
32    transaction_vote_tracker::TransactionVoteTracker,
33};
34
35/// Authority's network service implementation, agnostic to the actual networking stack used.
36pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
37    context: Arc<Context>,
38    commit_vote_monitor: Arc<CommitVoteMonitor>,
39    block_verifier: Arc<dyn BlockVerifier>,
40    synchronizer: Arc<SynchronizerHandle>,
41    core_dispatcher: Arc<C>,
42    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
43    subscription_counter: Arc<SubscriptionCounter>,
44    transaction_vote_tracker: TransactionVoteTracker,
45    dag_state: Arc<RwLock<DagState>>,
46    round_tracker: Arc<RwLock<RoundTracker>>,
47    block_sync_service: Arc<BlockSyncService>,
48}
49
50impl<C: CoreThreadDispatcher> AuthorityService<C> {
51    pub(crate) fn new(
52        context: Arc<Context>,
53        block_verifier: Arc<dyn BlockVerifier>,
54        commit_vote_monitor: Arc<CommitVoteMonitor>,
55        round_tracker: Arc<RwLock<RoundTracker>>,
56        synchronizer: Arc<SynchronizerHandle>,
57        core_dispatcher: Arc<C>,
58        rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
59        transaction_vote_tracker: TransactionVoteTracker,
60        dag_state: Arc<RwLock<DagState>>,
61        block_sync_service: Arc<BlockSyncService>,
62    ) -> Self {
63        let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
64        Self {
65            context,
66            block_verifier,
67            commit_vote_monitor,
68            synchronizer,
69            core_dispatcher,
70            rx_block_broadcast,
71            subscription_counter,
72            transaction_vote_tracker,
73            dag_state,
74            round_tracker,
75            block_sync_service,
76        }
77    }
78
79    // Parses and validates serialized excluded ancestors.
80    fn parse_excluded_ancestors(
81        &self,
82        peer: AuthorityIndex,
83        block: &VerifiedBlock,
84        mut excluded_ancestors: Vec<Vec<u8>>,
85    ) -> ConsensusResult<Vec<BlockRef>> {
86        let peer_hostname = &self.context.committee.authority(peer).hostname;
87
88        let excluded_ancestors_limit = self.context.committee.size() * 2;
89        if excluded_ancestors.len() > excluded_ancestors_limit {
90            debug!(
91                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
92                excluded_ancestors.len() - excluded_ancestors_limit,
93                peer,
94                peer_hostname,
95            );
96            excluded_ancestors.truncate(excluded_ancestors_limit);
97        }
98
99        let excluded_ancestors = excluded_ancestors
100            .into_iter()
101            .map(|serialized| {
102                let block_ref: BlockRef =
103                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
104                if !self.context.committee.is_valid_index(block_ref.author) {
105                    return Err(ConsensusError::InvalidAuthorityIndex {
106                        index: block_ref.author,
107                        max: self.context.committee.size(),
108                    });
109                }
110                if block_ref.round >= block.round() {
111                    return Err(ConsensusError::InvalidAncestorRound {
112                        ancestor: block_ref.round,
113                        block: block.round(),
114                    });
115                }
116                Ok(block_ref)
117            })
118            .collect::<ConsensusResult<Vec<BlockRef>>>()?;
119
120        for excluded_ancestor in &excluded_ancestors {
121            let excluded_ancestor_hostname = &self
122                .context
123                .committee
124                .authority(excluded_ancestor.author)
125                .hostname;
126            self.context
127                .metrics
128                .node_metrics
129                .network_excluded_ancestors_count_by_authority
130                .with_label_values(&[excluded_ancestor_hostname])
131                .inc();
132        }
133        self.context
134            .metrics
135            .node_metrics
136            .network_received_excluded_ancestors_from_authority
137            .with_label_values(&[peer_hostname])
138            .inc_by(excluded_ancestors.len() as u64);
139
140        Ok(excluded_ancestors)
141    }
142}
143
144#[async_trait]
145impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
146    async fn handle_send_block(
147        &self,
148        peer: AuthorityIndex,
149        serialized_block: ExtendedSerializedBlock,
150    ) -> ConsensusResult<()> {
151        fail_point_async!("consensus-rpc-response");
152
153        let peer_hostname = &self.context.committee.authority(peer).hostname;
154
155        // TODO: dedup block verifications, here and with fetched blocks.
156        let signed_block: SignedBlock =
157            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
158
159        // Reject blocks not produced by the peer.
160        if peer != signed_block.author() {
161            self.context
162                .metrics
163                .node_metrics
164                .invalid_blocks
165                .with_label_values(&[
166                    peer_hostname.as_str(),
167                    "handle_send_block",
168                    "UnexpectedAuthority",
169                ])
170                .inc();
171            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
172            info!("Block with wrong authority from {}: {}", peer, e);
173            return Err(e);
174        }
175
176        // Reject blocks failing parsing and validations.
177        let (verified_block, reject_txn_votes) = self
178            .block_verifier
179            .verify_and_vote(signed_block, serialized_block.block)
180            .tap_err(|e| {
181                self.context
182                    .metrics
183                    .node_metrics
184                    .invalid_blocks
185                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
186                    .inc();
187                info!("Invalid block from {}: {}", peer, e);
188            })?;
189        let excluded_ancestors = self
190            .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
191            .tap_err(|e| {
192                debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
193                self.context
194                    .metrics
195                    .node_metrics
196                    .invalid_blocks
197                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
198                    .inc();
199            })?;
200
201        let block_ref = verified_block.reference();
202        debug!("Received block {} via send block.", block_ref);
203
204        self.context
205            .metrics
206            .node_metrics
207            .verified_blocks
208            .with_label_values(&[peer_hostname])
209            .inc();
210
211        let now = self.context.clock.timestamp_utc_ms();
212        let forward_time_drift =
213            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
214
215        self.context
216            .metrics
217            .node_metrics
218            .block_timestamp_drift_ms
219            .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
220            .inc_by(forward_time_drift.as_millis() as u64);
221
222        // Observe the block for the commit votes. When local commit is lagging too much,
223        // commit sync loop will trigger fetching.
224        self.commit_vote_monitor.observe_block(&verified_block);
225
226        // Update own received rounds and peer accepted rounds from this verified block.
227        self.round_tracker
228            .write()
229            .update_from_verified_block(&ExtendedBlock {
230                block: verified_block.clone(),
231                excluded_ancestors: excluded_ancestors.clone(),
232            });
233
234        // Reject blocks when local commit index is lagging too far from quorum commit index,
235        // to avoid the memory overhead from suspended blocks.
236        //
237        // IMPORTANT: this must be done after observing votes from the block, otherwise
238        // observed quorum commit will no longer progress.
239        //
240        // Since the main issue with too many suspended blocks is memory usage not CPU,
241        // it is ok to reject after block verifications instead of before.
242        let last_commit_index = self.dag_state.read().last_commit_index();
243        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
244        if is_commit_lagging(
245            self.context.as_ref(),
246            last_commit_index,
247            quorum_commit_index,
248        ) {
249            self.context
250                .metrics
251                .node_metrics
252                .rejected_blocks
253                .with_label_values(&["commit_lagging"])
254                .inc();
255            debug!(
256                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
257                block_ref, last_commit_index, quorum_commit_index,
258            );
259            return Err(ConsensusError::BlockRejected {
260                block_ref,
261                reason: format!(
262                    "Last commit index is lagging quorum commit index too much ({} < {})",
263                    last_commit_index, quorum_commit_index,
264                ),
265            });
266        }
267
268        // The block is verified and current, so record own votes on the block
269        // before sending the block to Core.
270        if self.context.protocol_config.transaction_voting_enabled() {
271            self.transaction_vote_tracker
272                .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
273        }
274
275        // Send the block to Core to try accepting it into the DAG.
276        let missing_ancestors = self
277            .core_dispatcher
278            .add_blocks(vec![verified_block.clone()])
279            .await
280            .map_err(|_| ConsensusError::Shutdown)?;
281
282        // Schedule fetching missing ancestors from this peer in the background.
283        if !missing_ancestors.is_empty() {
284            self.context
285                .metrics
286                .node_metrics
287                .handler_received_block_missing_ancestors
288                .with_label_values(&[peer_hostname])
289                .inc_by(missing_ancestors.len() as u64);
290            let synchronizer = self.synchronizer.clone();
291            spawn_monitored_task!(async move {
292                // This does not wait for the fetch request to complete.
293                // It only waits for synchronizer to queue the request to a peer.
294                // When this fails, it usually means the queue is full.
295                // The fetch will retry from other peers via live and periodic syncs.
296                if let Err(err) = synchronizer
297                    .fetch_blocks(missing_ancestors, PeerId::Validator(peer))
298                    .await
299                {
300                    debug!("Failed to fetch missing ancestors via synchronizer: {err}");
301                }
302            });
303        }
304
305        // Schedule fetching missing soft links from this peer in the background.
306        let missing_excluded_ancestors = self
307            .core_dispatcher
308            .check_block_refs(excluded_ancestors)
309            .await
310            .map_err(|_| ConsensusError::Shutdown)?;
311        if !missing_excluded_ancestors.is_empty() {
312            self.context
313                .metrics
314                .node_metrics
315                .network_excluded_ancestors_sent_to_fetch
316                .with_label_values(&[peer_hostname])
317                .inc_by(missing_excluded_ancestors.len() as u64);
318
319            let synchronizer = self.synchronizer.clone();
320            spawn_monitored_task!(async move {
321                if let Err(err) = synchronizer
322                    .fetch_blocks(missing_excluded_ancestors, PeerId::Validator(peer))
323                    .await
324                {
325                    debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
326                }
327            });
328        }
329
330        Ok(())
331    }
332
333    async fn handle_subscribe_blocks(
334        &self,
335        peer: AuthorityIndex,
336        last_received: Round,
337    ) -> ConsensusResult<BlockStream> {
338        fail_point_async!("consensus-rpc-response");
339
340        // Find past proposed blocks as the initial blocks to send to the peer.
341        //
342        // If there are cached blocks in the range which the peer requested, send all of them.
343        // The size is bounded by the local GC round and DagState cache size.
344        //
345        // Otherwise if there is no cached block in the range which the peer requested,
346        // and this node has proposed blocks before, at least one block should be sent to the peer
347        // to help with liveness.
348        let past_proposed_blocks = {
349            let dag_state = self.dag_state.read();
350
351            let mut proposed_blocks =
352                dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
353            if proposed_blocks.is_empty() {
354                let last_proposed_block = dag_state
355                    .get_last_proposed_block()
356                    .expect("Last proposed block should be returned on validators");
357                proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
358                    vec![last_proposed_block]
359                } else {
360                    vec![]
361                };
362            }
363            stream::iter(
364                proposed_blocks
365                    .into_iter()
366                    .map(|block| ExtendedSerializedBlock {
367                        block: block.serialized().clone(),
368                        excluded_ancestors: vec![],
369                    }),
370            )
371        };
372
373        const MAX_BLOCKS_PER_POLL: usize = 1;
374        let broadcasted_blocks = BroadcastedBlockStream::new(
375            PeerId::Validator(peer),
376            self.rx_block_broadcast.resubscribe(),
377            MAX_BLOCKS_PER_POLL,
378            self.subscription_counter.clone(),
379        );
380
381        // Return a stream of blocks that first yields missed blocks as requested, then new blocks.
382        Ok(Box::pin(past_proposed_blocks.chain(
383            broadcasted_blocks.flat_map(|items| {
384                debug_assert!(
385                    items.len() <= MAX_BLOCKS_PER_POLL,
386                    "Too many blocks received from broadcast"
387                );
388                stream::iter(items.into_iter().map(ExtendedSerializedBlock::from))
389            }),
390        )))
391    }
392
393    // Handles 3 types of requests:
394    // 1. Live sync:
395    //    - Both missing block refs and highest accepted rounds are specified.
396    //    - fetch_missing_ancestors is true.
397    //    - response returns max_blocks_per_sync blocks.
398    // 2. Periodic sync:
399    //    - Highest accepted rounds must be specified.
400    //    - Missing block refs are optional.
401    //    - fetch_missing_ancestors is false (default).
402    //    - response returns max_blocks_per_fetch blocks.
403    // 3. Commit sync:
404    //    - Missing block refs are specified.
405    //    - Highest accepted rounds are empty.
406    //    - fetch_missing_ancestors is false (default).
407    //    - response returns max_blocks_per_fetch blocks.
408    async fn handle_fetch_blocks(
409        &self,
410        _peer: AuthorityIndex,
411        block_refs: Vec<BlockRef>,
412        fetch_after_rounds: Vec<Round>,
413        fetch_missing_ancestors: bool,
414    ) -> ConsensusResult<Vec<Bytes>> {
415        fail_point_async!("consensus-rpc-response");
416
417        // Delegate to BlockSyncService
418        self.block_sync_service
419            .fetch_blocks(block_refs, fetch_after_rounds, fetch_missing_ancestors)
420            .await
421    }
422
423    async fn handle_fetch_commits(
424        &self,
425        _peer: AuthorityIndex,
426        commit_range: CommitRange,
427    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
428        fail_point_async!("consensus-rpc-response");
429
430        // Delegate to BlockSyncService
431        self.block_sync_service.fetch_commits(commit_range).await
432    }
433
434    async fn handle_fetch_latest_blocks(
435        &self,
436        peer: AuthorityIndex,
437        authorities: Vec<AuthorityIndex>,
438    ) -> ConsensusResult<Vec<Bytes>> {
439        fail_point_async!("consensus-rpc-response");
440
441        // Delegate to BlockSyncService
442        self.block_sync_service
443            .fetch_latest_blocks(peer, authorities)
444            .await
445    }
446
447    async fn handle_get_latest_rounds(
448        &self,
449        _peer: AuthorityIndex,
450    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
451        fail_point_async!("consensus-rpc-response");
452
453        let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
454
455        let blocks = self
456            .dag_state
457            .read()
458            .get_last_cached_block_per_authority(Round::MAX);
459        let highest_accepted_rounds = blocks
460            .into_iter()
461            .map(|(block, _)| block.round())
462            .collect::<Vec<_>>();
463
464        Ok((highest_received_rounds, highest_accepted_rounds))
465    }
466}
467
468struct Counter {
469    count: usize,
470    subscriptions_by_peer: BTreeMap<PeerId, usize>,
471}
472
473/// Atomically counts the number of active subscriptions to the block broadcast stream.
474pub(crate) struct SubscriptionCounter {
475    context: Arc<Context>,
476    counter: parking_lot::Mutex<Counter>,
477}
478
479impl SubscriptionCounter {
480    pub(crate) fn new(context: Arc<Context>) -> Self {
481        // Set the subscribed peers by default to 0
482        for (_, authority) in context.committee.authorities() {
483            context
484                .metrics
485                .node_metrics
486                .subscribed_by
487                .with_label_values(&[authority.hostname.as_str()])
488                .set(0);
489        }
490
491        Self {
492            counter: parking_lot::Mutex::new(Counter {
493                count: 0,
494                subscriptions_by_peer: BTreeMap::new(),
495            }),
496            context,
497        }
498    }
499
500    fn increment(&self, peer: &PeerId) -> Result<(), ConsensusError> {
501        let mut counter = self.counter.lock();
502        counter.count += 1;
503        *counter
504            .subscriptions_by_peer
505            .entry(peer.clone())
506            .or_insert(0) += 1;
507
508        match peer {
509            PeerId::Validator(authority) => {
510                let peer_hostname = &self.context.committee.authority(*authority).hostname;
511                self.context
512                    .metrics
513                    .node_metrics
514                    .subscribed_by
515                    .with_label_values(&[peer_hostname])
516                    .set(1);
517            }
518            PeerId::Observer(_) => {
519                self.context
520                    .metrics
521                    .node_metrics
522                    .subscribed_by
523                    .with_label_values(&["observer"])
524                    .inc();
525            }
526        }
527
528        Ok(())
529    }
530
531    fn decrement(&self, peer: &PeerId) -> Result<(), ConsensusError> {
532        let mut counter = self.counter.lock();
533        counter.count -= 1;
534        *counter
535            .subscriptions_by_peer
536            .entry(peer.clone())
537            .or_insert(0) -= 1;
538
539        if counter.subscriptions_by_peer[peer] == 0 {
540            match peer {
541                PeerId::Validator(authority) => {
542                    let peer_hostname = &self.context.committee.authority(*authority).hostname;
543                    self.context
544                        .metrics
545                        .node_metrics
546                        .subscribed_by
547                        .with_label_values(&[peer_hostname])
548                        .set(0);
549                }
550                PeerId::Observer(_) => {
551                    self.context
552                        .metrics
553                        .node_metrics
554                        .subscribed_by
555                        .with_label_values(&["observer"])
556                        .dec();
557                }
558            }
559        }
560
561        Ok(())
562    }
563}
564
565/// Each broadcasted block stream wraps a broadcast receiver for blocks.
566/// It yields blocks that are broadcasted after the stream is created.
567type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
568
569/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
570/// this tolerates lags with only logging, without yielding errors.
571pub(crate) struct BroadcastStream<T> {
572    peer: Option<PeerId>,
573    // Stores the receiver across poll_next() calls.
574    inner: ReusableBoxFuture<
575        'static,
576        (
577            Result<T, broadcast::error::RecvError>,
578            broadcast::Receiver<T>,
579        ),
580    >,
581    // Maximum number of items to return per poll.
582    max_items_per_poll: usize,
583    // Counts total subscriptions / active BroadcastStreams.
584    subscription_counter: Option<Arc<SubscriptionCounter>>,
585}
586
587impl<T: 'static + Clone + Send> BroadcastStream<T> {
588    pub fn new(
589        peer: PeerId,
590        rx: broadcast::Receiver<T>,
591        max_items_per_poll: usize,
592        subscription_counter: Arc<SubscriptionCounter>,
593    ) -> Self {
594        assert!(max_items_per_poll > 0, "max_items_per_poll must be > 0");
595        if let Err(err) = subscription_counter.increment(&peer) {
596            match err {
597                ConsensusError::Shutdown => {}
598                _ => panic!("Unexpected error: {err}"),
599            }
600        }
601        Self {
602            peer: Some(peer),
603            inner: ReusableBoxFuture::new(make_recv_future(rx)),
604            max_items_per_poll,
605            subscription_counter: Some(subscription_counter),
606        }
607    }
608
609    /// Creates a stream without subscription tracking.
610    pub fn new_untracked(rx: broadcast::Receiver<T>, max_items_per_poll: usize) -> Self {
611        assert!(max_items_per_poll > 0, "max_items_per_poll must be > 0");
612        Self {
613            peer: None,
614            inner: ReusableBoxFuture::new(make_recv_future(rx)),
615            max_items_per_poll,
616            subscription_counter: None,
617        }
618    }
619}
620
621impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
622    type Item = Vec<T>;
623
624    fn poll_next(
625        mut self: Pin<&mut Self>,
626        cx: &mut task::Context<'_>,
627    ) -> task::Poll<Option<Self::Item>> {
628        loop {
629            let (result, mut rx) = ready!(self.inner.poll(cx));
630
631            match result {
632                Ok(item) => {
633                    let mut items = Vec::new();
634                    items.push(item);
635
636                    // Drain any additional items that are already available, up to the cap.
637                    while items.len() < self.max_items_per_poll {
638                        match rx.try_recv() {
639                            Ok(item) => items.push(item),
640                            Err(broadcast::error::TryRecvError::Empty) => break,
641                            Err(broadcast::error::TryRecvError::Closed) => break,
642                            Err(broadcast::error::TryRecvError::Lagged(n)) => {
643                                warn!("BroadcastStream {:?} lagged by {} messages", self.peer, n);
644                                break;
645                            }
646                        }
647                    }
648
649                    self.inner.set(make_recv_future(rx));
650                    return task::Poll::Ready(Some(items));
651                }
652                Err(broadcast::error::RecvError::Closed) => {
653                    info!("BroadcastStream {:?} closed", self.peer);
654                    return task::Poll::Ready(None);
655                }
656                Err(broadcast::error::RecvError::Lagged(n)) => {
657                    warn!("BroadcastStream {:?} lagged by {} messages", self.peer, n);
658                    // Re-arm the future and loop to await the next item.
659                    self.inner.set(make_recv_future(rx));
660                    continue;
661                }
662            }
663        }
664    }
665}
666
667impl<T> Drop for BroadcastStream<T> {
668    fn drop(&mut self) {
669        if let (Some(counter), Some(peer)) = (&self.subscription_counter, &self.peer)
670            && let Err(err) = counter.decrement(peer)
671        {
672            match err {
673                ConsensusError::Shutdown => {}
674                _ => panic!("Unexpected error: {err}"),
675            }
676        }
677    }
678}
679
680async fn make_recv_future<T: Clone>(
681    mut rx: broadcast::Receiver<T>,
682) -> (
683    Result<T, broadcast::error::RecvError>,
684    broadcast::Receiver<T>,
685) {
686    let result = rx.recv().await;
687    (result, rx)
688}
689
690#[cfg(test)]
691mod tests {
692    use std::{
693        collections::{BTreeMap, BTreeSet},
694        sync::Arc,
695        time::Duration,
696    };
697
698    use async_trait::async_trait;
699    use bytes::Bytes;
700    use consensus_config::AuthorityIndex;
701    use consensus_types::block::{BlockDigest, BlockRef, Round};
702    use parking_lot::{Mutex, RwLock};
703    use tokio::{sync::broadcast, time::sleep};
704
705    use futures::StreamExt as _;
706
707    use crate::{
708        authority_service::AuthorityService,
709        block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
710        block_sync_service::BlockSyncService,
711        commit::{CertifiedCommits, CommitRange},
712        commit_vote_monitor::CommitVoteMonitor,
713        context::Context,
714        core_thread::{CoreError, CoreThreadDispatcher},
715        dag_state::DagState,
716        error::ConsensusResult,
717        network::{
718            BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
719            ValidatorNetworkClient, ValidatorNetworkService,
720        },
721        peers_pool::PeersPool,
722        round_tracker::RoundTracker,
723        storage::mem_store::MemStore,
724        synchronizer::Synchronizer,
725        test_dag_builder::DagBuilder,
726        transaction_vote_tracker::TransactionVoteTracker,
727    };
728    struct FakeCoreThreadDispatcher {
729        blocks: Mutex<Vec<VerifiedBlock>>,
730    }
731
732    impl FakeCoreThreadDispatcher {
733        fn new() -> Self {
734            Self {
735                blocks: Mutex::new(vec![]),
736            }
737        }
738
739        fn get_blocks(&self) -> Vec<VerifiedBlock> {
740            self.blocks.lock().clone()
741        }
742    }
743
744    #[async_trait]
745    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
746        async fn add_blocks(
747            &self,
748            blocks: Vec<VerifiedBlock>,
749        ) -> Result<BTreeSet<BlockRef>, CoreError> {
750            let block_refs = blocks.iter().map(|b| b.reference()).collect();
751            self.blocks.lock().extend(blocks);
752            Ok(block_refs)
753        }
754
755        async fn check_block_refs(
756            &self,
757            _block_refs: Vec<BlockRef>,
758        ) -> Result<BTreeSet<BlockRef>, CoreError> {
759            Ok(BTreeSet::new())
760        }
761
762        async fn add_certified_commits(
763            &self,
764            _commits: CertifiedCommits,
765        ) -> Result<BTreeSet<BlockRef>, CoreError> {
766            todo!()
767        }
768
769        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
770            Ok(())
771        }
772
773        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
774            Ok(Default::default())
775        }
776
777        fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
778            todo!()
779        }
780
781        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
782            todo!()
783        }
784    }
785
786    #[derive(Default)]
787    struct FakeNetworkClient {}
788
789    #[async_trait]
790    impl ValidatorNetworkClient for FakeNetworkClient {
791        async fn send_block(
792            &self,
793            _peer: AuthorityIndex,
794            _block: &VerifiedBlock,
795            _timeout: Duration,
796        ) -> ConsensusResult<()> {
797            unimplemented!("Unimplemented")
798        }
799
800        async fn subscribe_blocks(
801            &self,
802            _peer: AuthorityIndex,
803            _last_received: Round,
804            _timeout: Duration,
805        ) -> ConsensusResult<BlockStream> {
806            unimplemented!("Unimplemented")
807        }
808
809        async fn fetch_blocks(
810            &self,
811            _peer: AuthorityIndex,
812            _block_refs: Vec<BlockRef>,
813            _fetch_after_rounds: Vec<Round>,
814            _fetch_missing_ancestors: bool,
815            _timeout: Duration,
816        ) -> ConsensusResult<Vec<Bytes>> {
817            unimplemented!("Unimplemented")
818        }
819
820        async fn fetch_commits(
821            &self,
822            _peer: AuthorityIndex,
823            _commit_range: CommitRange,
824            _timeout: Duration,
825        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
826            unimplemented!("Unimplemented")
827        }
828
829        async fn fetch_latest_blocks(
830            &self,
831            _peer: AuthorityIndex,
832            _authorities: Vec<AuthorityIndex>,
833            _timeout: Duration,
834        ) -> ConsensusResult<Vec<Bytes>> {
835            unimplemented!("Unimplemented")
836        }
837
838        async fn get_latest_rounds(
839            &self,
840            _peer: AuthorityIndex,
841            _timeout: Duration,
842        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
843            unimplemented!("Unimplemented")
844        }
845    }
846
847    #[async_trait]
848    impl ObserverNetworkClient for FakeNetworkClient {
849        async fn stream_blocks(
850            &self,
851            _peer: crate::network::PeerId,
852            _highest_round_per_authority: Vec<u64>,
853            _timeout: Duration,
854        ) -> ConsensusResult<crate::network::ObserverBlockStream> {
855            unimplemented!("Unimplemented")
856        }
857
858        async fn fetch_blocks(
859            &self,
860            _peer: crate::network::PeerId,
861            _block_refs: Vec<BlockRef>,
862            _fetch_after_rounds: Vec<Round>,
863            _fetch_missing_ancestors: bool,
864            _timeout: Duration,
865        ) -> ConsensusResult<Vec<Bytes>> {
866            unimplemented!("Unimplemented")
867        }
868
869        async fn fetch_commits(
870            &self,
871            _peer: crate::network::PeerId,
872            _commit_range: CommitRange,
873            _timeout: Duration,
874        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
875            unimplemented!("Unimplemented")
876        }
877    }
878
879    #[tokio::test(flavor = "current_thread", start_paused = true)]
880    async fn test_handle_send_block() {
881        let (context, _keys) = Context::new_for_test(4);
882        let context = Arc::new(context);
883        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
884        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
885        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
886        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
887        let fake_client = Arc::new(FakeNetworkClient::default());
888        let network_client = Arc::new(SynchronizerClient::new(
889            context.clone(),
890            Some(fake_client.clone()),
891            Some(fake_client.clone()),
892        ));
893        let store = Arc::new(MemStore::new());
894        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
895        let transaction_vote_tracker =
896            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
897        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
898        let peers_pool = Arc::new(PeersPool::new(context.clone()));
899        let synchronizer = Synchronizer::start(
900            network_client,
901            context.clone(),
902            core_dispatcher.clone(),
903            commit_vote_monitor.clone(),
904            block_verifier.clone(),
905            transaction_vote_tracker.clone(),
906            round_tracker.clone(),
907            dag_state.clone(),
908            peers_pool.clone(),
909            false,
910        );
911        let block_sync_service = Arc::new(BlockSyncService::new(
912            context.clone(),
913            dag_state.clone(),
914            store.clone(),
915        ));
916        let authority_service = Arc::new(AuthorityService::new(
917            context.clone(),
918            block_verifier,
919            commit_vote_monitor,
920            round_tracker,
921            synchronizer,
922            core_dispatcher.clone(),
923            rx_block_broadcast,
924            transaction_vote_tracker,
925            dag_state,
926            block_sync_service,
927        ));
928
929        // Test delaying blocks with time drift.
930        let now = context.clock.timestamp_utc_ms();
931        let max_drift = context.parameters.max_forward_time_drift;
932        let input_block = VerifiedBlock::new_for_test(
933            TestBlock::new(9, 0)
934                .set_timestamp_ms(now + max_drift.as_millis() as u64)
935                .build(),
936        );
937
938        let service = authority_service.clone();
939        let serialized = ExtendedSerializedBlock {
940            block: input_block.serialized().clone(),
941            excluded_ancestors: vec![],
942        };
943
944        tokio::spawn({
945            let service = service.clone();
946            let context = context.clone();
947            async move {
948                service
949                    .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
950                    .await
951                    .unwrap();
952            }
953        });
954
955        sleep(max_drift / 2).await;
956
957        let blocks = core_dispatcher.get_blocks();
958        assert_eq!(blocks.len(), 1);
959        assert_eq!(blocks[0], input_block);
960
961        // Test invalid block.
962        let invalid_block =
963            VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
964        let extended_block = ExtendedSerializedBlock {
965            block: invalid_block.serialized().clone(),
966            excluded_ancestors: vec![],
967        };
968        service
969            .handle_send_block(
970                context.committee.to_authority_index(0).unwrap(),
971                extended_block,
972            )
973            .await
974            .unwrap_err();
975
976        // Test invalid excluded ancestors.
977        let invalid_excluded_ancestors = vec![
978            bcs::to_bytes(&BlockRef::new(
979                10,
980                AuthorityIndex::new_for_test(1000),
981                BlockDigest::MIN,
982            ))
983            .unwrap(),
984            vec![3u8; 40],
985            bcs::to_bytes(&invalid_block.reference()).unwrap(),
986        ];
987        let extended_block = ExtendedSerializedBlock {
988            block: input_block.serialized().clone(),
989            excluded_ancestors: invalid_excluded_ancestors,
990        };
991        service
992            .handle_send_block(
993                context.committee.to_authority_index(0).unwrap(),
994                extended_block,
995            )
996            .await
997            .unwrap_err();
998    }
999
1000    #[tokio::test(flavor = "current_thread", start_paused = true)]
1001    async fn test_handle_fetch_blocks() {
1002        // GIVEN
1003        // Use NUM_AUTHORITIES and NUM_ROUNDS higher than max_blocks_per_sync to test limits.
1004        const NUM_AUTHORITIES: usize = 40;
1005        const NUM_ROUNDS: usize = 40;
1006        let (mut context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1007        context.parameters.max_blocks_per_fetch = 50;
1008        let context = Arc::new(context);
1009        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1010        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1011        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1012        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1013        let fake_client = Arc::new(FakeNetworkClient::default());
1014        let network_client = Arc::new(SynchronizerClient::new(
1015            context.clone(),
1016            Some(fake_client.clone()),
1017            Some(fake_client.clone()),
1018        ));
1019        let store = Arc::new(MemStore::new());
1020        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1021        let transaction_vote_tracker =
1022            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1023        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1024        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1025        let synchronizer = Synchronizer::start(
1026            network_client,
1027            context.clone(),
1028            core_dispatcher.clone(),
1029            commit_vote_monitor.clone(),
1030            block_verifier.clone(),
1031            transaction_vote_tracker.clone(),
1032            round_tracker.clone(),
1033            dag_state.clone(),
1034            peers_pool.clone(),
1035            false,
1036        );
1037        let block_sync_service = Arc::new(BlockSyncService::new(
1038            context.clone(),
1039            dag_state.clone(),
1040            store.clone(),
1041        ));
1042        let authority_service = Arc::new(AuthorityService::new(
1043            context.clone(),
1044            block_verifier,
1045            commit_vote_monitor,
1046            round_tracker,
1047            synchronizer,
1048            core_dispatcher.clone(),
1049            rx_block_broadcast,
1050            transaction_vote_tracker,
1051            dag_state.clone(),
1052            block_sync_service,
1053        ));
1054
1055        // GIVEN: 40 rounds of blocks in the dag state.
1056        let mut dag_builder = DagBuilder::new(context.clone());
1057        dag_builder
1058            .layers(1..=(NUM_ROUNDS as u32))
1059            .build()
1060            .persist_layers(dag_state.clone());
1061        dag_state.write().flush();
1062        let all_blocks = dag_builder.all_blocks();
1063
1064        // WHEN: Request 2 blocks from round 40, fetch missing ancestors enabled.
1065        let missing_block_refs: Vec<BlockRef> = all_blocks
1066            .iter()
1067            .rev()
1068            .take(2)
1069            .map(|b| b.reference())
1070            .collect();
1071        let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1072        let results = authority_service
1073            .handle_fetch_blocks(
1074                AuthorityIndex::new_for_test(0),
1075                missing_block_refs.clone(),
1076                highest_accepted_rounds,
1077                true,
1078            )
1079            .await
1080            .unwrap();
1081
1082        // THEN: the expected number of unique blocks are returned.
1083        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1084            .iter()
1085            .map(|b| {
1086                let signed = bcs::from_bytes(b).unwrap();
1087                let block = VerifiedBlock::new_verified(signed, b.clone());
1088                (block.reference(), block)
1089            })
1090            .collect();
1091        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1092        // All missing blocks are returned.
1093        for b in &missing_block_refs {
1094            assert!(blocks.contains_key(b));
1095        }
1096        let num_missing_ancestors = blocks
1097            .keys()
1098            .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1099            .count();
1100        assert_eq!(
1101            num_missing_ancestors,
1102            context.parameters.max_blocks_per_sync - missing_block_refs.len()
1103        );
1104
1105        // WHEN: Request 2 blocks from round 37, fetch missing ancestors disabled.
1106        let missing_round = NUM_ROUNDS as Round - 3;
1107        let missing_block_refs: Vec<BlockRef> = all_blocks
1108            .iter()
1109            .filter(|b| b.reference().round == missing_round)
1110            .map(|b| b.reference())
1111            .take(2)
1112            .collect();
1113        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1114        // Try to fill up the blocks from the 1st authority in missing_block_refs.
1115        highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1116        let results = authority_service
1117            .handle_fetch_blocks(
1118                AuthorityIndex::new_for_test(0),
1119                missing_block_refs.clone(),
1120                highest_accepted_rounds,
1121                false,
1122            )
1123            .await
1124            .unwrap();
1125
1126        // THEN: the expected number of unique blocks are returned.
1127        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1128            .iter()
1129            .map(|b| {
1130                let signed = bcs::from_bytes(b).unwrap();
1131                let block = VerifiedBlock::new_verified(signed, b.clone());
1132                (block.reference(), block)
1133            })
1134            .collect();
1135        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1136        // All missing blocks are returned.
1137        for b in &missing_block_refs {
1138            assert!(blocks.contains_key(b));
1139        }
1140        // Ancestor blocks are from the expected rounds and authorities.
1141        let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1142        for b in blocks.keys() {
1143            assert!(b.round <= missing_round);
1144            assert!(expected_authors.contains(&b.author));
1145        }
1146
1147        // WHEN: Request with empty block_refs, fetch missing ancestors disabled.
1148        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1149        // Set a few authorities to higher accepted rounds.
1150        highest_accepted_rounds[0] = (NUM_ROUNDS as Round) - 5;
1151        highest_accepted_rounds[1] = (NUM_ROUNDS as Round) - 3;
1152        let results = authority_service
1153            .handle_fetch_blocks(
1154                AuthorityIndex::new_for_test(0),
1155                vec![],
1156                highest_accepted_rounds.clone(),
1157                false,
1158            )
1159            .await
1160            .unwrap();
1161
1162        // THEN: the expected number of unique blocks are returned.
1163        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1164            .iter()
1165            .map(|b| {
1166                let signed = bcs::from_bytes(b).unwrap();
1167                let block = VerifiedBlock::new_verified(signed, b.clone());
1168                (block.reference(), block)
1169            })
1170            .collect();
1171        assert_eq!(blocks.len(), context.parameters.max_blocks_per_fetch);
1172        // Blocks should be from all authorities, within the expected round range.
1173        for block_ref in blocks.keys() {
1174            let accepted = highest_accepted_rounds[block_ref.author];
1175            assert!(block_ref.round > accepted);
1176        }
1177        // Blocks should be fetched in ascending round order across authorities,
1178        // so blocks should have low rounds near the accepted rounds.
1179        let max_round_in_result = blocks.keys().map(|b| b.round).max().unwrap();
1180        // With 40 authorities mostly at accepted round 1 and max_blocks_per_fetch=50,
1181        // the min-heap fills ~1-2 rounds per authority.
1182        assert!(
1183            max_round_in_result <= 4,
1184            "Expected low rounds from fair round-order fetching, got max round {}",
1185            max_round_in_result
1186        );
1187
1188        // WHEN: Request 5 block from round 40, not getting ancestors.
1189        let missing_block_refs: Vec<BlockRef> = all_blocks
1190            .iter()
1191            .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1192            .map(|b| b.reference())
1193            .take(5)
1194            .collect();
1195        let results = authority_service
1196            .handle_fetch_blocks(
1197                AuthorityIndex::new_for_test(0),
1198                missing_block_refs.clone(),
1199                vec![],
1200                false,
1201            )
1202            .await
1203            .unwrap();
1204
1205        // THEN: the expected number of unique blocks are returned.
1206        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1207            .iter()
1208            .map(|b| {
1209                let signed = bcs::from_bytes(b).unwrap();
1210                let block = VerifiedBlock::new_verified(signed, b.clone());
1211                (block.reference(), block)
1212            })
1213            .collect();
1214        assert_eq!(blocks.len(), 5);
1215        for b in &missing_block_refs {
1216            assert!(blocks.contains_key(b));
1217        }
1218    }
1219
1220    #[tokio::test(flavor = "current_thread", start_paused = true)]
1221    async fn test_handle_fetch_latest_blocks() {
1222        // GIVEN
1223        let (context, _keys) = Context::new_for_test(4);
1224        let context = Arc::new(context);
1225        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1226        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1227        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1228        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1229        let fake_client = Arc::new(FakeNetworkClient::default());
1230        let network_client = Arc::new(SynchronizerClient::new(
1231            context.clone(),
1232            Some(fake_client.clone()),
1233            Some(fake_client.clone()),
1234        ));
1235        let store = Arc::new(MemStore::new());
1236        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1237        let transaction_vote_tracker =
1238            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1239        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1240        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1241        let synchronizer = Synchronizer::start(
1242            network_client,
1243            context.clone(),
1244            core_dispatcher.clone(),
1245            commit_vote_monitor.clone(),
1246            block_verifier.clone(),
1247            transaction_vote_tracker.clone(),
1248            round_tracker.clone(),
1249            dag_state.clone(),
1250            peers_pool.clone(),
1251            true,
1252        );
1253        let block_sync_service = Arc::new(BlockSyncService::new(
1254            context.clone(),
1255            dag_state.clone(),
1256            store.clone(),
1257        ));
1258        let authority_service = Arc::new(AuthorityService::new(
1259            context.clone(),
1260            block_verifier,
1261            commit_vote_monitor,
1262            round_tracker,
1263            synchronizer,
1264            core_dispatcher.clone(),
1265            rx_block_broadcast,
1266            transaction_vote_tracker,
1267            dag_state.clone(),
1268            block_sync_service,
1269        ));
1270
1271        // Create some blocks for a few authorities. Create some equivocations as well and store in dag state.
1272        let mut dag_builder = DagBuilder::new(context.clone());
1273        dag_builder
1274            .layers(1..=10)
1275            .authorities(vec![AuthorityIndex::new_for_test(2)])
1276            .equivocate(1)
1277            .build()
1278            .persist_layers(dag_state);
1279
1280        // WHEN
1281        let authorities_to_request = vec![
1282            AuthorityIndex::new_for_test(1),
1283            AuthorityIndex::new_for_test(2),
1284        ];
1285        let results = authority_service
1286            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1287            .await;
1288
1289        // THEN
1290        let serialised_blocks = results.unwrap();
1291        for serialised_block in serialised_blocks {
1292            let signed_block: SignedBlock =
1293                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1294            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1295
1296            assert_eq!(verified_block.round(), 10);
1297        }
1298    }
1299
1300    #[tokio::test(flavor = "current_thread", start_paused = true)]
1301    async fn test_handle_subscribe_blocks() {
1302        let (context, _keys) = Context::new_for_test(4);
1303        let context = Arc::new(context);
1304        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1305        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1306        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1307        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1308        let fake_client = Arc::new(FakeNetworkClient::default());
1309        let network_client = Arc::new(SynchronizerClient::new(
1310            context.clone(),
1311            Some(fake_client.clone()),
1312            Some(fake_client.clone()),
1313        ));
1314        let store = Arc::new(MemStore::new());
1315        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1316        let transaction_vote_tracker =
1317            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1318        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1319        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1320        let synchronizer = Synchronizer::start(
1321            network_client,
1322            context.clone(),
1323            core_dispatcher.clone(),
1324            commit_vote_monitor.clone(),
1325            block_verifier.clone(),
1326            transaction_vote_tracker.clone(),
1327            round_tracker.clone(),
1328            dag_state.clone(),
1329            peers_pool.clone(),
1330            false,
1331        );
1332        let block_sync_service = Arc::new(BlockSyncService::new(
1333            context.clone(),
1334            dag_state.clone(),
1335            store.clone(),
1336        ));
1337
1338        // Create 3 proposed blocks at rounds 5, 10, 15 for own authority (index 0)
1339        dag_state
1340            .write()
1341            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1342        dag_state
1343            .write()
1344            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1345        dag_state
1346            .write()
1347            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1348
1349        let authority_service = Arc::new(AuthorityService::new(
1350            context.clone(),
1351            block_verifier,
1352            commit_vote_monitor,
1353            round_tracker,
1354            synchronizer,
1355            core_dispatcher.clone(),
1356            rx_block_broadcast,
1357            transaction_vote_tracker,
1358            dag_state.clone(),
1359            block_sync_service,
1360        ));
1361
1362        let peer = context.committee.to_authority_index(1).unwrap();
1363
1364        // Case A: Subscribe with last_received = 100 (after all proposed blocks)
1365        // Should return last proposed block (round 15) as fallback
1366        {
1367            let mut stream = authority_service
1368                .handle_subscribe_blocks(peer, 100)
1369                .await
1370                .unwrap();
1371            let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1372            assert_eq!(
1373                block.round(),
1374                15,
1375                "Should return last proposed block as fallback"
1376            );
1377            assert_eq!(block.author().value(), 0);
1378        }
1379
1380        // Case B: Subscribe with last_received = 7 (includes rounds 10, 15)
1381        // Should return cached blocks from round 8+
1382        {
1383            let mut stream = authority_service
1384                .handle_subscribe_blocks(peer, 7)
1385                .await
1386                .unwrap();
1387
1388            let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1389            assert_eq!(block1.round(), 10, "Should return block at round 10");
1390
1391            let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1392            assert_eq!(block2.round(), 15, "Should return block at round 15");
1393        }
1394    }
1395
1396    #[tokio::test(flavor = "current_thread", start_paused = true)]
1397    async fn test_handle_subscribe_blocks_not_proposed() {
1398        let (context, _keys) = Context::new_for_test(4);
1399        let context = Arc::new(context);
1400        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1401        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1402        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1403        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1404        let fake_client = Arc::new(FakeNetworkClient::default());
1405        let network_client = Arc::new(SynchronizerClient::new(
1406            context.clone(),
1407            Some(fake_client.clone()),
1408            Some(fake_client.clone()),
1409        ));
1410        let store = Arc::new(MemStore::new());
1411        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1412        let transaction_vote_tracker =
1413            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1414        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1415        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1416        let synchronizer = Synchronizer::start(
1417            network_client,
1418            context.clone(),
1419            core_dispatcher.clone(),
1420            commit_vote_monitor.clone(),
1421            block_verifier.clone(),
1422            transaction_vote_tracker.clone(),
1423            round_tracker.clone(),
1424            dag_state.clone(),
1425            peers_pool.clone(),
1426            false,
1427        );
1428        let block_sync_service = Arc::new(BlockSyncService::new(
1429            context.clone(),
1430            dag_state.clone(),
1431            store.clone(),
1432        ));
1433
1434        // No blocks added to DagState - only genesis exists
1435
1436        let authority_service = Arc::new(AuthorityService::new(
1437            context.clone(),
1438            block_verifier,
1439            commit_vote_monitor,
1440            round_tracker,
1441            synchronizer,
1442            core_dispatcher.clone(),
1443            rx_block_broadcast,
1444            transaction_vote_tracker,
1445            dag_state.clone(),
1446            block_sync_service,
1447        ));
1448
1449        let peer = context.committee.to_authority_index(1).unwrap();
1450
1451        // Subscribe - no blocks have been proposed yet (only genesis exists)
1452        let mut stream = authority_service
1453            .handle_subscribe_blocks(peer, 0)
1454            .await
1455            .unwrap();
1456
1457        // Should NOT receive any block (genesis must not be returned)
1458        use futures::poll;
1459        use std::task::Poll;
1460        let poll_result = poll!(stream.next());
1461        assert!(
1462            matches!(poll_result, Poll::Pending),
1463            "Should not receive genesis block on subscription stream"
1464        );
1465    }
1466}