consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use consensus_config::AuthorityIndex;
9use consensus_types::block::{BlockRef, Round};
10use futures::{Stream, StreamExt, ready, stream, task};
11use mysten_metrics::spawn_monitored_task;
12use parking_lot::RwLock;
13use sui_macros::fail_point_async;
14use tap::TapFallible;
15use tokio::sync::broadcast;
16use tokio_util::sync::ReusableBoxFuture;
17use tracing::{debug, info, warn};
18
19use crate::{
20    block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
21    block_sync_service::BlockSyncService,
22    block_verifier::BlockVerifier,
23    commit::{CommitRange, TrustedCommit},
24    commit_vote_monitor::CommitVoteMonitor,
25    context::Context,
26    core_thread::CoreThreadDispatcher,
27    dag_state::DagState,
28    error::{ConsensusError, ConsensusResult},
29    network::{BlockStream, ExtendedSerializedBlock, PeerId, ValidatorNetworkService},
30    round_tracker::RoundTracker,
31    synchronizer::SynchronizerHandle,
32    transaction_vote_tracker::TransactionVoteTracker,
33};
34
35pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
36
37/// Authority's network service implementation, agnostic to the actual networking stack used.
38pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
39    context: Arc<Context>,
40    commit_vote_monitor: Arc<CommitVoteMonitor>,
41    block_verifier: Arc<dyn BlockVerifier>,
42    synchronizer: Arc<SynchronizerHandle>,
43    core_dispatcher: Arc<C>,
44    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
45    subscription_counter: Arc<SubscriptionCounter>,
46    transaction_vote_tracker: TransactionVoteTracker,
47    dag_state: Arc<RwLock<DagState>>,
48    round_tracker: Arc<RwLock<RoundTracker>>,
49    block_sync_service: Arc<BlockSyncService>,
50}
51
52impl<C: CoreThreadDispatcher> AuthorityService<C> {
53    pub(crate) fn new(
54        context: Arc<Context>,
55        block_verifier: Arc<dyn BlockVerifier>,
56        commit_vote_monitor: Arc<CommitVoteMonitor>,
57        round_tracker: Arc<RwLock<RoundTracker>>,
58        synchronizer: Arc<SynchronizerHandle>,
59        core_dispatcher: Arc<C>,
60        rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
61        transaction_vote_tracker: TransactionVoteTracker,
62        dag_state: Arc<RwLock<DagState>>,
63        block_sync_service: Arc<BlockSyncService>,
64    ) -> Self {
65        let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
66        Self {
67            context,
68            block_verifier,
69            commit_vote_monitor,
70            synchronizer,
71            core_dispatcher,
72            rx_block_broadcast,
73            subscription_counter,
74            transaction_vote_tracker,
75            dag_state,
76            round_tracker,
77            block_sync_service,
78        }
79    }
80
81    // Parses and validates serialized excluded ancestors.
82    fn parse_excluded_ancestors(
83        &self,
84        peer: AuthorityIndex,
85        block: &VerifiedBlock,
86        mut excluded_ancestors: Vec<Vec<u8>>,
87    ) -> ConsensusResult<Vec<BlockRef>> {
88        let peer_hostname = &self.context.committee.authority(peer).hostname;
89
90        let excluded_ancestors_limit = self.context.committee.size() * 2;
91        if excluded_ancestors.len() > excluded_ancestors_limit {
92            debug!(
93                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
94                excluded_ancestors.len() - excluded_ancestors_limit,
95                peer,
96                peer_hostname,
97            );
98            excluded_ancestors.truncate(excluded_ancestors_limit);
99        }
100
101        let excluded_ancestors = excluded_ancestors
102            .into_iter()
103            .map(|serialized| {
104                let block_ref: BlockRef =
105                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
106                if !self.context.committee.is_valid_index(block_ref.author) {
107                    return Err(ConsensusError::InvalidAuthorityIndex {
108                        index: block_ref.author,
109                        max: self.context.committee.size(),
110                    });
111                }
112                if block_ref.round >= block.round() {
113                    return Err(ConsensusError::InvalidAncestorRound {
114                        ancestor: block_ref.round,
115                        block: block.round(),
116                    });
117                }
118                Ok(block_ref)
119            })
120            .collect::<ConsensusResult<Vec<BlockRef>>>()?;
121
122        for excluded_ancestor in &excluded_ancestors {
123            let excluded_ancestor_hostname = &self
124                .context
125                .committee
126                .authority(excluded_ancestor.author)
127                .hostname;
128            self.context
129                .metrics
130                .node_metrics
131                .network_excluded_ancestors_count_by_authority
132                .with_label_values(&[excluded_ancestor_hostname])
133                .inc();
134        }
135        self.context
136            .metrics
137            .node_metrics
138            .network_received_excluded_ancestors_from_authority
139            .with_label_values(&[peer_hostname])
140            .inc_by(excluded_ancestors.len() as u64);
141
142        Ok(excluded_ancestors)
143    }
144}
145
146#[async_trait]
147impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
148    async fn handle_send_block(
149        &self,
150        peer: AuthorityIndex,
151        serialized_block: ExtendedSerializedBlock,
152    ) -> ConsensusResult<()> {
153        fail_point_async!("consensus-rpc-response");
154
155        let peer_hostname = &self.context.committee.authority(peer).hostname;
156
157        // TODO: dedup block verifications, here and with fetched blocks.
158        let signed_block: SignedBlock =
159            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
160
161        // Reject blocks not produced by the peer.
162        if peer != signed_block.author() {
163            self.context
164                .metrics
165                .node_metrics
166                .invalid_blocks
167                .with_label_values(&[
168                    peer_hostname.as_str(),
169                    "handle_send_block",
170                    "UnexpectedAuthority",
171                ])
172                .inc();
173            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
174            info!("Block with wrong authority from {}: {}", peer, e);
175            return Err(e);
176        }
177
178        // Reject blocks failing parsing and validations.
179        let (verified_block, reject_txn_votes) = self
180            .block_verifier
181            .verify_and_vote(signed_block, serialized_block.block)
182            .tap_err(|e| {
183                self.context
184                    .metrics
185                    .node_metrics
186                    .invalid_blocks
187                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
188                    .inc();
189                info!("Invalid block from {}: {}", peer, e);
190            })?;
191        let excluded_ancestors = self
192            .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
193            .tap_err(|e| {
194                debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
195                self.context
196                    .metrics
197                    .node_metrics
198                    .invalid_blocks
199                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
200                    .inc();
201            })?;
202
203        let block_ref = verified_block.reference();
204        debug!("Received block {} via send block.", block_ref);
205
206        self.context
207            .metrics
208            .node_metrics
209            .verified_blocks
210            .with_label_values(&[peer_hostname])
211            .inc();
212
213        let now = self.context.clock.timestamp_utc_ms();
214        let forward_time_drift =
215            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
216
217        self.context
218            .metrics
219            .node_metrics
220            .block_timestamp_drift_ms
221            .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
222            .inc_by(forward_time_drift.as_millis() as u64);
223
224        // Observe the block for the commit votes. When local commit is lagging too much,
225        // commit sync loop will trigger fetching.
226        self.commit_vote_monitor.observe_block(&verified_block);
227
228        // Update own received rounds and peer accepted rounds from this verified block.
229        self.round_tracker
230            .write()
231            .update_from_verified_block(&ExtendedBlock {
232                block: verified_block.clone(),
233                excluded_ancestors: excluded_ancestors.clone(),
234            });
235
236        // Reject blocks when local commit index is lagging too far from quorum commit index,
237        // to avoid the memory overhead from suspended blocks.
238        //
239        // IMPORTANT: this must be done after observing votes from the block, otherwise
240        // observed quorum commit will no longer progress.
241        //
242        // Since the main issue with too many suspended blocks is memory usage not CPU,
243        // it is ok to reject after block verifications instead of before.
244        let last_commit_index = self.dag_state.read().last_commit_index();
245        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
246        // The threshold to ignore block should be larger than commit_sync_batch_size,
247        // to avoid excessive block rejections and synchronizations.
248        if last_commit_index
249            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
250            < quorum_commit_index
251        {
252            self.context
253                .metrics
254                .node_metrics
255                .rejected_blocks
256                .with_label_values(&["commit_lagging"])
257                .inc();
258            debug!(
259                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
260                block_ref, last_commit_index, quorum_commit_index,
261            );
262            return Err(ConsensusError::BlockRejected {
263                block_ref,
264                reason: format!(
265                    "Last commit index is lagging quorum commit index too much ({} < {})",
266                    last_commit_index, quorum_commit_index,
267                ),
268            });
269        }
270
271        // The block is verified and current, so record own votes on the block
272        // before sending the block to Core.
273        if self.context.protocol_config.transaction_voting_enabled() {
274            self.transaction_vote_tracker
275                .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
276        }
277
278        // Send the block to Core to try accepting it into the DAG.
279        let missing_ancestors = self
280            .core_dispatcher
281            .add_blocks(vec![verified_block.clone()])
282            .await
283            .map_err(|_| ConsensusError::Shutdown)?;
284
285        // Schedule fetching missing ancestors from this peer in the background.
286        if !missing_ancestors.is_empty() {
287            self.context
288                .metrics
289                .node_metrics
290                .handler_received_block_missing_ancestors
291                .with_label_values(&[peer_hostname])
292                .inc_by(missing_ancestors.len() as u64);
293            let synchronizer = self.synchronizer.clone();
294            spawn_monitored_task!(async move {
295                // This does not wait for the fetch request to complete.
296                // It only waits for synchronizer to queue the request to a peer.
297                // When this fails, it usually means the queue is full.
298                // The fetch will retry from other peers via live and periodic syncs.
299                if let Err(err) = synchronizer
300                    .fetch_blocks(missing_ancestors, PeerId::Validator(peer))
301                    .await
302                {
303                    debug!("Failed to fetch missing ancestors via synchronizer: {err}");
304                }
305            });
306        }
307
308        // Schedule fetching missing soft links from this peer in the background.
309        let missing_excluded_ancestors = self
310            .core_dispatcher
311            .check_block_refs(excluded_ancestors)
312            .await
313            .map_err(|_| ConsensusError::Shutdown)?;
314        if !missing_excluded_ancestors.is_empty() {
315            self.context
316                .metrics
317                .node_metrics
318                .network_excluded_ancestors_sent_to_fetch
319                .with_label_values(&[peer_hostname])
320                .inc_by(missing_excluded_ancestors.len() as u64);
321
322            let synchronizer = self.synchronizer.clone();
323            spawn_monitored_task!(async move {
324                if let Err(err) = synchronizer
325                    .fetch_blocks(missing_excluded_ancestors, PeerId::Validator(peer))
326                    .await
327                {
328                    debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
329                }
330            });
331        }
332
333        Ok(())
334    }
335
336    async fn handle_subscribe_blocks(
337        &self,
338        peer: AuthorityIndex,
339        last_received: Round,
340    ) -> ConsensusResult<BlockStream> {
341        fail_point_async!("consensus-rpc-response");
342
343        // Find past proposed blocks as the initial blocks to send to the peer.
344        //
345        // If there are cached blocks in the range which the peer requested, send all of them.
346        // The size is bounded by the local GC round and DagState cache size.
347        //
348        // Otherwise if there is no cached block in the range which the peer requested,
349        // and this node has proposed blocks before, at least one block should be sent to the peer
350        // to help with liveness.
351        let past_proposed_blocks = {
352            let dag_state = self.dag_state.read();
353
354            let mut proposed_blocks =
355                dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
356            if proposed_blocks.is_empty() {
357                let last_proposed_block = dag_state
358                    .get_last_proposed_block()
359                    .expect("Last proposed block should be returned on validators");
360                proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
361                    vec![last_proposed_block]
362                } else {
363                    vec![]
364                };
365            }
366            stream::iter(
367                proposed_blocks
368                    .into_iter()
369                    .map(|block| ExtendedSerializedBlock {
370                        block: block.serialized().clone(),
371                        excluded_ancestors: vec![],
372                    }),
373            )
374        };
375
376        const MAX_BLOCKS_PER_POLL: usize = 1;
377        let broadcasted_blocks = BroadcastedBlockStream::new(
378            PeerId::Validator(peer),
379            self.rx_block_broadcast.resubscribe(),
380            MAX_BLOCKS_PER_POLL,
381            self.subscription_counter.clone(),
382        );
383
384        // Return a stream of blocks that first yields missed blocks as requested, then new blocks.
385        Ok(Box::pin(past_proposed_blocks.chain(
386            broadcasted_blocks.flat_map(|items| {
387                debug_assert!(
388                    items.len() <= MAX_BLOCKS_PER_POLL,
389                    "Too many blocks received from broadcast"
390                );
391                stream::iter(items.into_iter().map(ExtendedSerializedBlock::from))
392            }),
393        )))
394    }
395
396    // Handles 3 types of requests:
397    // 1. Live sync:
398    //    - Both missing block refs and highest accepted rounds are specified.
399    //    - fetch_missing_ancestors is true.
400    //    - response returns max_blocks_per_sync blocks.
401    // 2. Periodic sync:
402    //    - Highest accepted rounds must be specified.
403    //    - Missing block refs are optional.
404    //    - fetch_missing_ancestors is false (default).
405    //    - response returns max_blocks_per_fetch blocks.
406    // 3. Commit sync:
407    //    - Missing block refs are specified.
408    //    - Highest accepted rounds are empty.
409    //    - fetch_missing_ancestors is false (default).
410    //    - response returns max_blocks_per_fetch blocks.
411    async fn handle_fetch_blocks(
412        &self,
413        _peer: AuthorityIndex,
414        block_refs: Vec<BlockRef>,
415        fetch_after_rounds: Vec<Round>,
416        fetch_missing_ancestors: bool,
417    ) -> ConsensusResult<Vec<Bytes>> {
418        fail_point_async!("consensus-rpc-response");
419
420        // Delegate to BlockSyncService
421        self.block_sync_service
422            .fetch_blocks(block_refs, fetch_after_rounds, fetch_missing_ancestors)
423            .await
424    }
425
426    async fn handle_fetch_commits(
427        &self,
428        _peer: AuthorityIndex,
429        commit_range: CommitRange,
430    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
431        fail_point_async!("consensus-rpc-response");
432
433        // Delegate to BlockSyncService
434        self.block_sync_service.fetch_commits(commit_range).await
435    }
436
437    async fn handle_fetch_latest_blocks(
438        &self,
439        peer: AuthorityIndex,
440        authorities: Vec<AuthorityIndex>,
441    ) -> ConsensusResult<Vec<Bytes>> {
442        fail_point_async!("consensus-rpc-response");
443
444        // Delegate to BlockSyncService
445        self.block_sync_service
446            .fetch_latest_blocks(peer, authorities)
447            .await
448    }
449
450    async fn handle_get_latest_rounds(
451        &self,
452        _peer: AuthorityIndex,
453    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
454        fail_point_async!("consensus-rpc-response");
455
456        let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
457
458        let blocks = self
459            .dag_state
460            .read()
461            .get_last_cached_block_per_authority(Round::MAX);
462        let highest_accepted_rounds = blocks
463            .into_iter()
464            .map(|(block, _)| block.round())
465            .collect::<Vec<_>>();
466
467        Ok((highest_received_rounds, highest_accepted_rounds))
468    }
469}
470
471struct Counter {
472    count: usize,
473    subscriptions_by_peer: BTreeMap<PeerId, usize>,
474}
475
476/// Atomically counts the number of active subscriptions to the block broadcast stream.
477pub(crate) struct SubscriptionCounter {
478    context: Arc<Context>,
479    counter: parking_lot::Mutex<Counter>,
480}
481
482impl SubscriptionCounter {
483    pub(crate) fn new(context: Arc<Context>) -> Self {
484        // Set the subscribed peers by default to 0
485        for (_, authority) in context.committee.authorities() {
486            context
487                .metrics
488                .node_metrics
489                .subscribed_by
490                .with_label_values(&[authority.hostname.as_str()])
491                .set(0);
492        }
493
494        Self {
495            counter: parking_lot::Mutex::new(Counter {
496                count: 0,
497                subscriptions_by_peer: BTreeMap::new(),
498            }),
499            context,
500        }
501    }
502
503    fn increment(&self, peer: &PeerId) -> Result<(), ConsensusError> {
504        let mut counter = self.counter.lock();
505        counter.count += 1;
506        *counter
507            .subscriptions_by_peer
508            .entry(peer.clone())
509            .or_insert(0) += 1;
510
511        match peer {
512            PeerId::Validator(authority) => {
513                let peer_hostname = &self.context.committee.authority(*authority).hostname;
514                self.context
515                    .metrics
516                    .node_metrics
517                    .subscribed_by
518                    .with_label_values(&[peer_hostname])
519                    .set(1);
520            }
521            PeerId::Observer(_) => {
522                self.context
523                    .metrics
524                    .node_metrics
525                    .subscribed_by
526                    .with_label_values(&["observer"])
527                    .inc();
528            }
529        }
530
531        Ok(())
532    }
533
534    fn decrement(&self, peer: &PeerId) -> Result<(), ConsensusError> {
535        let mut counter = self.counter.lock();
536        counter.count -= 1;
537        *counter
538            .subscriptions_by_peer
539            .entry(peer.clone())
540            .or_insert(0) -= 1;
541
542        if counter.subscriptions_by_peer[peer] == 0 {
543            match peer {
544                PeerId::Validator(authority) => {
545                    let peer_hostname = &self.context.committee.authority(*authority).hostname;
546                    self.context
547                        .metrics
548                        .node_metrics
549                        .subscribed_by
550                        .with_label_values(&[peer_hostname])
551                        .set(0);
552                }
553                PeerId::Observer(_) => {
554                    self.context
555                        .metrics
556                        .node_metrics
557                        .subscribed_by
558                        .with_label_values(&["observer"])
559                        .dec();
560                }
561            }
562        }
563
564        Ok(())
565    }
566}
567
568/// Each broadcasted block stream wraps a broadcast receiver for blocks.
569/// It yields blocks that are broadcasted after the stream is created.
570type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
571
572/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
573/// this tolerates lags with only logging, without yielding errors.
574pub(crate) struct BroadcastStream<T> {
575    peer: PeerId,
576    // Stores the receiver across poll_next() calls.
577    inner: ReusableBoxFuture<
578        'static,
579        (
580            Result<T, broadcast::error::RecvError>,
581            broadcast::Receiver<T>,
582        ),
583    >,
584    // Maximum number of items to return per poll.
585    max_items_per_poll: usize,
586    // Counts total subscriptions / active BroadcastStreams.
587    subscription_counter: Arc<SubscriptionCounter>,
588}
589
590impl<T: 'static + Clone + Send> BroadcastStream<T> {
591    pub fn new(
592        peer: PeerId,
593        rx: broadcast::Receiver<T>,
594        max_items_per_poll: usize,
595        subscription_counter: Arc<SubscriptionCounter>,
596    ) -> Self {
597        assert!(max_items_per_poll > 0, "max_items_per_poll must be > 0");
598        if let Err(err) = subscription_counter.increment(&peer) {
599            match err {
600                ConsensusError::Shutdown => {}
601                _ => panic!("Unexpected error: {err}"),
602            }
603        }
604        Self {
605            peer,
606            inner: ReusableBoxFuture::new(make_recv_future(rx)),
607            max_items_per_poll,
608            subscription_counter,
609        }
610    }
611}
612
613impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
614    type Item = Vec<T>;
615
616    fn poll_next(
617        mut self: Pin<&mut Self>,
618        cx: &mut task::Context<'_>,
619    ) -> task::Poll<Option<Self::Item>> {
620        loop {
621            let (result, mut rx) = ready!(self.inner.poll(cx));
622
623            match result {
624                Ok(item) => {
625                    let mut items = Vec::new();
626                    items.push(item);
627
628                    // Drain any additional items that are already available, up to the cap.
629                    while items.len() < self.max_items_per_poll {
630                        match rx.try_recv() {
631                            Ok(item) => items.push(item),
632                            Err(broadcast::error::TryRecvError::Empty) => break,
633                            Err(broadcast::error::TryRecvError::Closed) => break,
634                            Err(broadcast::error::TryRecvError::Lagged(n)) => {
635                                warn!(
636                                    "Block BroadcastedBlockStream {} lagged by {} messages",
637                                    self.peer, n
638                                );
639                                break;
640                            }
641                        }
642                    }
643
644                    self.inner.set(make_recv_future(rx));
645                    return task::Poll::Ready(Some(items));
646                }
647                Err(broadcast::error::RecvError::Closed) => {
648                    info!("Block BroadcastedBlockStream {} closed", self.peer);
649                    return task::Poll::Ready(None);
650                }
651                Err(broadcast::error::RecvError::Lagged(n)) => {
652                    warn!(
653                        "Block BroadcastedBlockStream {} lagged by {} messages",
654                        self.peer, n
655                    );
656                    // Re-arm the future and loop to await the next item.
657                    self.inner.set(make_recv_future(rx));
658                    continue;
659                }
660            }
661        }
662    }
663}
664
665impl<T> Drop for BroadcastStream<T> {
666    fn drop(&mut self) {
667        if let Err(err) = self.subscription_counter.decrement(&self.peer) {
668            match err {
669                ConsensusError::Shutdown => {}
670                _ => panic!("Unexpected error: {err}"),
671            }
672        }
673    }
674}
675
676async fn make_recv_future<T: Clone>(
677    mut rx: broadcast::Receiver<T>,
678) -> (
679    Result<T, broadcast::error::RecvError>,
680    broadcast::Receiver<T>,
681) {
682    let result = rx.recv().await;
683    (result, rx)
684}
685
686// TODO: add a unit test for BroadcastStream.
687
688#[cfg(test)]
689mod tests {
690    use std::{
691        collections::{BTreeMap, BTreeSet},
692        sync::Arc,
693        time::Duration,
694    };
695
696    use async_trait::async_trait;
697    use bytes::Bytes;
698    use consensus_config::AuthorityIndex;
699    use consensus_types::block::{BlockDigest, BlockRef, Round};
700    use parking_lot::{Mutex, RwLock};
701    use tokio::{sync::broadcast, time::sleep};
702
703    use futures::StreamExt as _;
704
705    use crate::{
706        authority_service::AuthorityService,
707        block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
708        block_sync_service::BlockSyncService,
709        commit::{CertifiedCommits, CommitRange},
710        commit_vote_monitor::CommitVoteMonitor,
711        context::Context,
712        core_thread::{CoreError, CoreThreadDispatcher},
713        dag_state::DagState,
714        error::ConsensusResult,
715        network::{
716            BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
717            ValidatorNetworkClient, ValidatorNetworkService,
718        },
719        peers_pool::PeersPool,
720        round_tracker::RoundTracker,
721        storage::mem_store::MemStore,
722        synchronizer::Synchronizer,
723        test_dag_builder::DagBuilder,
724        transaction_vote_tracker::TransactionVoteTracker,
725    };
726    struct FakeCoreThreadDispatcher {
727        blocks: Mutex<Vec<VerifiedBlock>>,
728    }
729
730    impl FakeCoreThreadDispatcher {
731        fn new() -> Self {
732            Self {
733                blocks: Mutex::new(vec![]),
734            }
735        }
736
737        fn get_blocks(&self) -> Vec<VerifiedBlock> {
738            self.blocks.lock().clone()
739        }
740    }
741
742    #[async_trait]
743    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
744        async fn add_blocks(
745            &self,
746            blocks: Vec<VerifiedBlock>,
747        ) -> Result<BTreeSet<BlockRef>, CoreError> {
748            let block_refs = blocks.iter().map(|b| b.reference()).collect();
749            self.blocks.lock().extend(blocks);
750            Ok(block_refs)
751        }
752
753        async fn check_block_refs(
754            &self,
755            _block_refs: Vec<BlockRef>,
756        ) -> Result<BTreeSet<BlockRef>, CoreError> {
757            Ok(BTreeSet::new())
758        }
759
760        async fn add_certified_commits(
761            &self,
762            _commits: CertifiedCommits,
763        ) -> Result<BTreeSet<BlockRef>, CoreError> {
764            todo!()
765        }
766
767        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
768            Ok(())
769        }
770
771        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
772            Ok(Default::default())
773        }
774
775        fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
776            todo!()
777        }
778
779        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
780            todo!()
781        }
782    }
783
784    #[derive(Default)]
785    struct FakeNetworkClient {}
786
787    #[async_trait]
788    impl ValidatorNetworkClient for FakeNetworkClient {
789        async fn send_block(
790            &self,
791            _peer: AuthorityIndex,
792            _block: &VerifiedBlock,
793            _timeout: Duration,
794        ) -> ConsensusResult<()> {
795            unimplemented!("Unimplemented")
796        }
797
798        async fn subscribe_blocks(
799            &self,
800            _peer: AuthorityIndex,
801            _last_received: Round,
802            _timeout: Duration,
803        ) -> ConsensusResult<BlockStream> {
804            unimplemented!("Unimplemented")
805        }
806
807        async fn fetch_blocks(
808            &self,
809            _peer: AuthorityIndex,
810            _block_refs: Vec<BlockRef>,
811            _fetch_after_rounds: Vec<Round>,
812            _fetch_missing_ancestors: bool,
813            _timeout: Duration,
814        ) -> ConsensusResult<Vec<Bytes>> {
815            unimplemented!("Unimplemented")
816        }
817
818        async fn fetch_commits(
819            &self,
820            _peer: AuthorityIndex,
821            _commit_range: CommitRange,
822            _timeout: Duration,
823        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
824            unimplemented!("Unimplemented")
825        }
826
827        async fn fetch_latest_blocks(
828            &self,
829            _peer: AuthorityIndex,
830            _authorities: Vec<AuthorityIndex>,
831            _timeout: Duration,
832        ) -> ConsensusResult<Vec<Bytes>> {
833            unimplemented!("Unimplemented")
834        }
835
836        async fn get_latest_rounds(
837            &self,
838            _peer: AuthorityIndex,
839            _timeout: Duration,
840        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
841            unimplemented!("Unimplemented")
842        }
843    }
844
845    #[async_trait]
846    impl ObserverNetworkClient for FakeNetworkClient {
847        async fn stream_blocks(
848            &self,
849            _peer: crate::network::PeerId,
850            _highest_round_per_authority: Vec<u64>,
851            _timeout: Duration,
852        ) -> ConsensusResult<crate::network::ObserverBlockStream> {
853            unimplemented!("Unimplemented")
854        }
855
856        async fn fetch_blocks(
857            &self,
858            _peer: crate::network::PeerId,
859            _block_refs: Vec<BlockRef>,
860            _fetch_after_rounds: Vec<Round>,
861            _fetch_missing_ancestors: bool,
862            _timeout: Duration,
863        ) -> ConsensusResult<Vec<Bytes>> {
864            unimplemented!("Unimplemented")
865        }
866
867        async fn fetch_commits(
868            &self,
869            _peer: crate::network::PeerId,
870            _commit_range: CommitRange,
871            _timeout: Duration,
872        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
873            unimplemented!("Unimplemented")
874        }
875    }
876
877    #[tokio::test(flavor = "current_thread", start_paused = true)]
878    async fn test_handle_send_block() {
879        let (context, _keys) = Context::new_for_test(4);
880        let context = Arc::new(context);
881        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
882        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
883        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
884        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
885        let fake_client = Arc::new(FakeNetworkClient::default());
886        let network_client = Arc::new(SynchronizerClient::new(
887            context.clone(),
888            Some(fake_client.clone()),
889            Some(fake_client.clone()),
890        ));
891        let store = Arc::new(MemStore::new());
892        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
893        let transaction_vote_tracker =
894            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
895        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
896        let peers_pool = Arc::new(PeersPool::new(context.clone()));
897        let synchronizer = Synchronizer::start(
898            network_client,
899            context.clone(),
900            core_dispatcher.clone(),
901            commit_vote_monitor.clone(),
902            block_verifier.clone(),
903            transaction_vote_tracker.clone(),
904            round_tracker.clone(),
905            dag_state.clone(),
906            peers_pool.clone(),
907            false,
908        );
909        let block_sync_service = Arc::new(BlockSyncService::new(
910            context.clone(),
911            dag_state.clone(),
912            store.clone(),
913        ));
914        let authority_service = Arc::new(AuthorityService::new(
915            context.clone(),
916            block_verifier,
917            commit_vote_monitor,
918            round_tracker,
919            synchronizer,
920            core_dispatcher.clone(),
921            rx_block_broadcast,
922            transaction_vote_tracker,
923            dag_state,
924            block_sync_service,
925        ));
926
927        // Test delaying blocks with time drift.
928        let now = context.clock.timestamp_utc_ms();
929        let max_drift = context.parameters.max_forward_time_drift;
930        let input_block = VerifiedBlock::new_for_test(
931            TestBlock::new(9, 0)
932                .set_timestamp_ms(now + max_drift.as_millis() as u64)
933                .build(),
934        );
935
936        let service = authority_service.clone();
937        let serialized = ExtendedSerializedBlock {
938            block: input_block.serialized().clone(),
939            excluded_ancestors: vec![],
940        };
941
942        tokio::spawn({
943            let service = service.clone();
944            let context = context.clone();
945            async move {
946                service
947                    .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
948                    .await
949                    .unwrap();
950            }
951        });
952
953        sleep(max_drift / 2).await;
954
955        let blocks = core_dispatcher.get_blocks();
956        assert_eq!(blocks.len(), 1);
957        assert_eq!(blocks[0], input_block);
958
959        // Test invalid block.
960        let invalid_block =
961            VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
962        let extended_block = ExtendedSerializedBlock {
963            block: invalid_block.serialized().clone(),
964            excluded_ancestors: vec![],
965        };
966        service
967            .handle_send_block(
968                context.committee.to_authority_index(0).unwrap(),
969                extended_block,
970            )
971            .await
972            .unwrap_err();
973
974        // Test invalid excluded ancestors.
975        let invalid_excluded_ancestors = vec![
976            bcs::to_bytes(&BlockRef::new(
977                10,
978                AuthorityIndex::new_for_test(1000),
979                BlockDigest::MIN,
980            ))
981            .unwrap(),
982            vec![3u8; 40],
983            bcs::to_bytes(&invalid_block.reference()).unwrap(),
984        ];
985        let extended_block = ExtendedSerializedBlock {
986            block: input_block.serialized().clone(),
987            excluded_ancestors: invalid_excluded_ancestors,
988        };
989        service
990            .handle_send_block(
991                context.committee.to_authority_index(0).unwrap(),
992                extended_block,
993            )
994            .await
995            .unwrap_err();
996    }
997
998    #[tokio::test(flavor = "current_thread", start_paused = true)]
999    async fn test_handle_fetch_blocks() {
1000        // GIVEN
1001        // Use NUM_AUTHORITIES and NUM_ROUNDS higher than max_blocks_per_sync to test limits.
1002        const NUM_AUTHORITIES: usize = 40;
1003        const NUM_ROUNDS: usize = 40;
1004        let (mut context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1005        context.parameters.max_blocks_per_fetch = 50;
1006        let context = Arc::new(context);
1007        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1008        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1009        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1010        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1011        let fake_client = Arc::new(FakeNetworkClient::default());
1012        let network_client = Arc::new(SynchronizerClient::new(
1013            context.clone(),
1014            Some(fake_client.clone()),
1015            Some(fake_client.clone()),
1016        ));
1017        let store = Arc::new(MemStore::new());
1018        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1019        let transaction_vote_tracker =
1020            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1021        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1022        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1023        let synchronizer = Synchronizer::start(
1024            network_client,
1025            context.clone(),
1026            core_dispatcher.clone(),
1027            commit_vote_monitor.clone(),
1028            block_verifier.clone(),
1029            transaction_vote_tracker.clone(),
1030            round_tracker.clone(),
1031            dag_state.clone(),
1032            peers_pool.clone(),
1033            false,
1034        );
1035        let block_sync_service = Arc::new(BlockSyncService::new(
1036            context.clone(),
1037            dag_state.clone(),
1038            store.clone(),
1039        ));
1040        let authority_service = Arc::new(AuthorityService::new(
1041            context.clone(),
1042            block_verifier,
1043            commit_vote_monitor,
1044            round_tracker,
1045            synchronizer,
1046            core_dispatcher.clone(),
1047            rx_block_broadcast,
1048            transaction_vote_tracker,
1049            dag_state.clone(),
1050            block_sync_service,
1051        ));
1052
1053        // GIVEN: 40 rounds of blocks in the dag state.
1054        let mut dag_builder = DagBuilder::new(context.clone());
1055        dag_builder
1056            .layers(1..=(NUM_ROUNDS as u32))
1057            .build()
1058            .persist_layers(dag_state.clone());
1059        dag_state.write().flush();
1060        let all_blocks = dag_builder.all_blocks();
1061
1062        // WHEN: Request 2 blocks from round 40, fetch missing ancestors enabled.
1063        let missing_block_refs: Vec<BlockRef> = all_blocks
1064            .iter()
1065            .rev()
1066            .take(2)
1067            .map(|b| b.reference())
1068            .collect();
1069        let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1070        let results = authority_service
1071            .handle_fetch_blocks(
1072                AuthorityIndex::new_for_test(0),
1073                missing_block_refs.clone(),
1074                highest_accepted_rounds,
1075                true,
1076            )
1077            .await
1078            .unwrap();
1079
1080        // THEN: the expected number of unique blocks are returned.
1081        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1082            .iter()
1083            .map(|b| {
1084                let signed = bcs::from_bytes(b).unwrap();
1085                let block = VerifiedBlock::new_verified(signed, b.clone());
1086                (block.reference(), block)
1087            })
1088            .collect();
1089        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1090        // All missing blocks are returned.
1091        for b in &missing_block_refs {
1092            assert!(blocks.contains_key(b));
1093        }
1094        let num_missing_ancestors = blocks
1095            .keys()
1096            .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1097            .count();
1098        assert_eq!(
1099            num_missing_ancestors,
1100            context.parameters.max_blocks_per_sync - missing_block_refs.len()
1101        );
1102
1103        // WHEN: Request 2 blocks from round 37, fetch missing ancestors disabled.
1104        let missing_round = NUM_ROUNDS as Round - 3;
1105        let missing_block_refs: Vec<BlockRef> = all_blocks
1106            .iter()
1107            .filter(|b| b.reference().round == missing_round)
1108            .map(|b| b.reference())
1109            .take(2)
1110            .collect();
1111        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1112        // Try to fill up the blocks from the 1st authority in missing_block_refs.
1113        highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1114        let results = authority_service
1115            .handle_fetch_blocks(
1116                AuthorityIndex::new_for_test(0),
1117                missing_block_refs.clone(),
1118                highest_accepted_rounds,
1119                false,
1120            )
1121            .await
1122            .unwrap();
1123
1124        // THEN: the expected number of unique blocks are returned.
1125        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1126            .iter()
1127            .map(|b| {
1128                let signed = bcs::from_bytes(b).unwrap();
1129                let block = VerifiedBlock::new_verified(signed, b.clone());
1130                (block.reference(), block)
1131            })
1132            .collect();
1133        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1134        // All missing blocks are returned.
1135        for b in &missing_block_refs {
1136            assert!(blocks.contains_key(b));
1137        }
1138        // Ancestor blocks are from the expected rounds and authorities.
1139        let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1140        for b in blocks.keys() {
1141            assert!(b.round <= missing_round);
1142            assert!(expected_authors.contains(&b.author));
1143        }
1144
1145        // WHEN: Request with empty block_refs, fetch missing ancestors disabled.
1146        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1147        // Set a few authorities to higher accepted rounds.
1148        highest_accepted_rounds[0] = (NUM_ROUNDS as Round) - 5;
1149        highest_accepted_rounds[1] = (NUM_ROUNDS as Round) - 3;
1150        let results = authority_service
1151            .handle_fetch_blocks(
1152                AuthorityIndex::new_for_test(0),
1153                vec![],
1154                highest_accepted_rounds.clone(),
1155                false,
1156            )
1157            .await
1158            .unwrap();
1159
1160        // THEN: the expected number of unique blocks are returned.
1161        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1162            .iter()
1163            .map(|b| {
1164                let signed = bcs::from_bytes(b).unwrap();
1165                let block = VerifiedBlock::new_verified(signed, b.clone());
1166                (block.reference(), block)
1167            })
1168            .collect();
1169        assert_eq!(blocks.len(), context.parameters.max_blocks_per_fetch);
1170        // Blocks should be from all authorities, within the expected round range.
1171        for block_ref in blocks.keys() {
1172            let accepted = highest_accepted_rounds[block_ref.author];
1173            assert!(block_ref.round > accepted);
1174        }
1175        // Blocks should be fetched in ascending round order across authorities,
1176        // so blocks should have low rounds near the accepted rounds.
1177        let max_round_in_result = blocks.keys().map(|b| b.round).max().unwrap();
1178        // With 40 authorities mostly at accepted round 1 and max_blocks_per_fetch=50,
1179        // the min-heap fills ~1-2 rounds per authority.
1180        assert!(
1181            max_round_in_result <= 4,
1182            "Expected low rounds from fair round-order fetching, got max round {}",
1183            max_round_in_result
1184        );
1185
1186        // WHEN: Request 5 block from round 40, not getting ancestors.
1187        let missing_block_refs: Vec<BlockRef> = all_blocks
1188            .iter()
1189            .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1190            .map(|b| b.reference())
1191            .take(5)
1192            .collect();
1193        let results = authority_service
1194            .handle_fetch_blocks(
1195                AuthorityIndex::new_for_test(0),
1196                missing_block_refs.clone(),
1197                vec![],
1198                false,
1199            )
1200            .await
1201            .unwrap();
1202
1203        // THEN: the expected number of unique blocks are returned.
1204        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1205            .iter()
1206            .map(|b| {
1207                let signed = bcs::from_bytes(b).unwrap();
1208                let block = VerifiedBlock::new_verified(signed, b.clone());
1209                (block.reference(), block)
1210            })
1211            .collect();
1212        assert_eq!(blocks.len(), 5);
1213        for b in &missing_block_refs {
1214            assert!(blocks.contains_key(b));
1215        }
1216    }
1217
1218    #[tokio::test(flavor = "current_thread", start_paused = true)]
1219    async fn test_handle_fetch_latest_blocks() {
1220        // GIVEN
1221        let (context, _keys) = Context::new_for_test(4);
1222        let context = Arc::new(context);
1223        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1224        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1225        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1226        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1227        let fake_client = Arc::new(FakeNetworkClient::default());
1228        let network_client = Arc::new(SynchronizerClient::new(
1229            context.clone(),
1230            Some(fake_client.clone()),
1231            Some(fake_client.clone()),
1232        ));
1233        let store = Arc::new(MemStore::new());
1234        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1235        let transaction_vote_tracker =
1236            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1237        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1238        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1239        let synchronizer = Synchronizer::start(
1240            network_client,
1241            context.clone(),
1242            core_dispatcher.clone(),
1243            commit_vote_monitor.clone(),
1244            block_verifier.clone(),
1245            transaction_vote_tracker.clone(),
1246            round_tracker.clone(),
1247            dag_state.clone(),
1248            peers_pool.clone(),
1249            true,
1250        );
1251        let block_sync_service = Arc::new(BlockSyncService::new(
1252            context.clone(),
1253            dag_state.clone(),
1254            store.clone(),
1255        ));
1256        let authority_service = Arc::new(AuthorityService::new(
1257            context.clone(),
1258            block_verifier,
1259            commit_vote_monitor,
1260            round_tracker,
1261            synchronizer,
1262            core_dispatcher.clone(),
1263            rx_block_broadcast,
1264            transaction_vote_tracker,
1265            dag_state.clone(),
1266            block_sync_service,
1267        ));
1268
1269        // Create some blocks for a few authorities. Create some equivocations as well and store in dag state.
1270        let mut dag_builder = DagBuilder::new(context.clone());
1271        dag_builder
1272            .layers(1..=10)
1273            .authorities(vec![AuthorityIndex::new_for_test(2)])
1274            .equivocate(1)
1275            .build()
1276            .persist_layers(dag_state);
1277
1278        // WHEN
1279        let authorities_to_request = vec![
1280            AuthorityIndex::new_for_test(1),
1281            AuthorityIndex::new_for_test(2),
1282        ];
1283        let results = authority_service
1284            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1285            .await;
1286
1287        // THEN
1288        let serialised_blocks = results.unwrap();
1289        for serialised_block in serialised_blocks {
1290            let signed_block: SignedBlock =
1291                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1292            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1293
1294            assert_eq!(verified_block.round(), 10);
1295        }
1296    }
1297
1298    #[tokio::test(flavor = "current_thread", start_paused = true)]
1299    async fn test_handle_subscribe_blocks() {
1300        let (context, _keys) = Context::new_for_test(4);
1301        let context = Arc::new(context);
1302        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1303        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1304        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1305        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1306        let fake_client = Arc::new(FakeNetworkClient::default());
1307        let network_client = Arc::new(SynchronizerClient::new(
1308            context.clone(),
1309            Some(fake_client.clone()),
1310            Some(fake_client.clone()),
1311        ));
1312        let store = Arc::new(MemStore::new());
1313        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1314        let transaction_vote_tracker =
1315            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1316        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1317        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1318        let synchronizer = Synchronizer::start(
1319            network_client,
1320            context.clone(),
1321            core_dispatcher.clone(),
1322            commit_vote_monitor.clone(),
1323            block_verifier.clone(),
1324            transaction_vote_tracker.clone(),
1325            round_tracker.clone(),
1326            dag_state.clone(),
1327            peers_pool.clone(),
1328            false,
1329        );
1330        let block_sync_service = Arc::new(BlockSyncService::new(
1331            context.clone(),
1332            dag_state.clone(),
1333            store.clone(),
1334        ));
1335
1336        // Create 3 proposed blocks at rounds 5, 10, 15 for own authority (index 0)
1337        dag_state
1338            .write()
1339            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1340        dag_state
1341            .write()
1342            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1343        dag_state
1344            .write()
1345            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1346
1347        let authority_service = Arc::new(AuthorityService::new(
1348            context.clone(),
1349            block_verifier,
1350            commit_vote_monitor,
1351            round_tracker,
1352            synchronizer,
1353            core_dispatcher.clone(),
1354            rx_block_broadcast,
1355            transaction_vote_tracker,
1356            dag_state.clone(),
1357            block_sync_service,
1358        ));
1359
1360        let peer = context.committee.to_authority_index(1).unwrap();
1361
1362        // Case A: Subscribe with last_received = 100 (after all proposed blocks)
1363        // Should return last proposed block (round 15) as fallback
1364        {
1365            let mut stream = authority_service
1366                .handle_subscribe_blocks(peer, 100)
1367                .await
1368                .unwrap();
1369            let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1370            assert_eq!(
1371                block.round(),
1372                15,
1373                "Should return last proposed block as fallback"
1374            );
1375            assert_eq!(block.author().value(), 0);
1376        }
1377
1378        // Case B: Subscribe with last_received = 7 (includes rounds 10, 15)
1379        // Should return cached blocks from round 8+
1380        {
1381            let mut stream = authority_service
1382                .handle_subscribe_blocks(peer, 7)
1383                .await
1384                .unwrap();
1385
1386            let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1387            assert_eq!(block1.round(), 10, "Should return block at round 10");
1388
1389            let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1390            assert_eq!(block2.round(), 15, "Should return block at round 15");
1391        }
1392    }
1393
1394    #[tokio::test(flavor = "current_thread", start_paused = true)]
1395    async fn test_handle_subscribe_blocks_not_proposed() {
1396        let (context, _keys) = Context::new_for_test(4);
1397        let context = Arc::new(context);
1398        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1399        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1400        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1401        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1402        let fake_client = Arc::new(FakeNetworkClient::default());
1403        let network_client = Arc::new(SynchronizerClient::new(
1404            context.clone(),
1405            Some(fake_client.clone()),
1406            Some(fake_client.clone()),
1407        ));
1408        let store = Arc::new(MemStore::new());
1409        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1410        let transaction_vote_tracker =
1411            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1412        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1413        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1414        let synchronizer = Synchronizer::start(
1415            network_client,
1416            context.clone(),
1417            core_dispatcher.clone(),
1418            commit_vote_monitor.clone(),
1419            block_verifier.clone(),
1420            transaction_vote_tracker.clone(),
1421            round_tracker.clone(),
1422            dag_state.clone(),
1423            peers_pool.clone(),
1424            false,
1425        );
1426        let block_sync_service = Arc::new(BlockSyncService::new(
1427            context.clone(),
1428            dag_state.clone(),
1429            store.clone(),
1430        ));
1431
1432        // No blocks added to DagState - only genesis exists
1433
1434        let authority_service = Arc::new(AuthorityService::new(
1435            context.clone(),
1436            block_verifier,
1437            commit_vote_monitor,
1438            round_tracker,
1439            synchronizer,
1440            core_dispatcher.clone(),
1441            rx_block_broadcast,
1442            transaction_vote_tracker,
1443            dag_state.clone(),
1444            block_sync_service,
1445        ));
1446
1447        let peer = context.committee.to_authority_index(1).unwrap();
1448
1449        // Subscribe - no blocks have been proposed yet (only genesis exists)
1450        let mut stream = authority_service
1451            .handle_subscribe_blocks(peer, 0)
1452            .await
1453            .unwrap();
1454
1455        // Should NOT receive any block (genesis must not be returned)
1456        use futures::poll;
1457        use std::task::Poll;
1458        let poll_result = poll!(stream.next());
1459        assert!(
1460            matches!(poll_result, Poll::Pending),
1461            "Should not receive genesis block on subscription stream"
1462        );
1463    }
1464}