consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    pin::Pin,
7    sync::Arc,
8    time::Duration,
9};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use consensus_types::block::{BlockRef, Round};
15use futures::{Stream, StreamExt, ready, stream, task};
16use mysten_metrics::spawn_monitored_task;
17use parking_lot::RwLock;
18use rand::seq::SliceRandom as _;
19use sui_macros::fail_point_async;
20use tap::TapFallible;
21use tokio::sync::broadcast;
22use tokio_util::sync::ReusableBoxFuture;
23use tracing::{debug, info, warn};
24
25use crate::{
26    CommitIndex,
27    block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
28    block_verifier::BlockVerifier,
29    commit::{CommitAPI as _, CommitRange, TrustedCommit},
30    commit_vote_monitor::CommitVoteMonitor,
31    context::Context,
32    core_thread::CoreThreadDispatcher,
33    dag_state::DagState,
34    error::{ConsensusError, ConsensusResult},
35    network::{
36        BlockRequestStream, BlockStream, ExtendedSerializedBlock, NodeId, ObserverBlockStream,
37        ObserverNetworkService, ValidatorNetworkService,
38    },
39    round_tracker::RoundTracker,
40    stake_aggregator::{QuorumThreshold, StakeAggregator},
41    storage::Store,
42    synchronizer::SynchronizerHandle,
43    transaction_certifier::TransactionCertifier,
44};
45
46pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
47
48/// Authority's network service implementation, agnostic to the actual networking stack used.
49pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
50    context: Arc<Context>,
51    commit_vote_monitor: Arc<CommitVoteMonitor>,
52    block_verifier: Arc<dyn BlockVerifier>,
53    synchronizer: Arc<SynchronizerHandle>,
54    core_dispatcher: Arc<C>,
55    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
56    subscription_counter: Arc<SubscriptionCounter>,
57    transaction_certifier: TransactionCertifier,
58    dag_state: Arc<RwLock<DagState>>,
59    store: Arc<dyn Store>,
60    round_tracker: Arc<RwLock<RoundTracker>>,
61}
62
63impl<C: CoreThreadDispatcher> AuthorityService<C> {
64    pub(crate) fn new(
65        context: Arc<Context>,
66        block_verifier: Arc<dyn BlockVerifier>,
67        commit_vote_monitor: Arc<CommitVoteMonitor>,
68        round_tracker: Arc<RwLock<RoundTracker>>,
69        synchronizer: Arc<SynchronizerHandle>,
70        core_dispatcher: Arc<C>,
71        rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
72        transaction_certifier: TransactionCertifier,
73        dag_state: Arc<RwLock<DagState>>,
74        store: Arc<dyn Store>,
75    ) -> Self {
76        let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
77        Self {
78            context,
79            block_verifier,
80            commit_vote_monitor,
81            synchronizer,
82            core_dispatcher,
83            rx_block_broadcast,
84            subscription_counter,
85            transaction_certifier,
86            dag_state,
87            store,
88            round_tracker,
89        }
90    }
91
92    // Parses and validates serialized excluded ancestors.
93    fn parse_excluded_ancestors(
94        &self,
95        peer: AuthorityIndex,
96        block: &VerifiedBlock,
97        mut excluded_ancestors: Vec<Vec<u8>>,
98    ) -> ConsensusResult<Vec<BlockRef>> {
99        let peer_hostname = &self.context.committee.authority(peer).hostname;
100
101        let excluded_ancestors_limit = self.context.committee.size() * 2;
102        if excluded_ancestors.len() > excluded_ancestors_limit {
103            debug!(
104                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
105                excluded_ancestors.len() - excluded_ancestors_limit,
106                peer,
107                peer_hostname,
108            );
109            excluded_ancestors.truncate(excluded_ancestors_limit);
110        }
111
112        let excluded_ancestors = excluded_ancestors
113            .into_iter()
114            .map(|serialized| {
115                let block_ref: BlockRef =
116                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
117                if !self.context.committee.is_valid_index(block_ref.author) {
118                    return Err(ConsensusError::InvalidAuthorityIndex {
119                        index: block_ref.author,
120                        max: self.context.committee.size(),
121                    });
122                }
123                if block_ref.round >= block.round() {
124                    return Err(ConsensusError::InvalidAncestorRound {
125                        ancestor: block_ref.round,
126                        block: block.round(),
127                    });
128                }
129                Ok(block_ref)
130            })
131            .collect::<ConsensusResult<Vec<BlockRef>>>()?;
132
133        for excluded_ancestor in &excluded_ancestors {
134            let excluded_ancestor_hostname = &self
135                .context
136                .committee
137                .authority(excluded_ancestor.author)
138                .hostname;
139            self.context
140                .metrics
141                .node_metrics
142                .network_excluded_ancestors_count_by_authority
143                .with_label_values(&[excluded_ancestor_hostname])
144                .inc();
145        }
146        self.context
147            .metrics
148            .node_metrics
149            .network_received_excluded_ancestors_from_authority
150            .with_label_values(&[peer_hostname])
151            .inc_by(excluded_ancestors.len() as u64);
152
153        Ok(excluded_ancestors)
154    }
155}
156
157#[async_trait]
158impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
159    async fn handle_send_block(
160        &self,
161        peer: AuthorityIndex,
162        serialized_block: ExtendedSerializedBlock,
163    ) -> ConsensusResult<()> {
164        fail_point_async!("consensus-rpc-response");
165
166        let peer_hostname = &self.context.committee.authority(peer).hostname;
167
168        // TODO: dedup block verifications, here and with fetched blocks.
169        let signed_block: SignedBlock =
170            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
171
172        // Reject blocks not produced by the peer.
173        if peer != signed_block.author() {
174            self.context
175                .metrics
176                .node_metrics
177                .invalid_blocks
178                .with_label_values(&[
179                    peer_hostname.as_str(),
180                    "handle_send_block",
181                    "UnexpectedAuthority",
182                ])
183                .inc();
184            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
185            info!("Block with wrong authority from {}: {}", peer, e);
186            return Err(e);
187        }
188
189        // Reject blocks failing parsing and validations.
190        let (verified_block, reject_txn_votes) = self
191            .block_verifier
192            .verify_and_vote(signed_block, serialized_block.block)
193            .tap_err(|e| {
194                self.context
195                    .metrics
196                    .node_metrics
197                    .invalid_blocks
198                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
199                    .inc();
200                info!("Invalid block from {}: {}", peer, e);
201            })?;
202        let excluded_ancestors = self
203            .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
204            .tap_err(|e| {
205                debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
206                self.context
207                    .metrics
208                    .node_metrics
209                    .invalid_blocks
210                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
211                    .inc();
212            })?;
213
214        let block_ref = verified_block.reference();
215        debug!("Received block {} via send block.", block_ref);
216
217        self.context
218            .metrics
219            .node_metrics
220            .verified_blocks
221            .with_label_values(&[peer_hostname])
222            .inc();
223
224        let now = self.context.clock.timestamp_utc_ms();
225        let forward_time_drift =
226            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
227
228        self.context
229            .metrics
230            .node_metrics
231            .block_timestamp_drift_ms
232            .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
233            .inc_by(forward_time_drift.as_millis() as u64);
234
235        // Observe the block for the commit votes. When local commit is lagging too much,
236        // commit sync loop will trigger fetching.
237        self.commit_vote_monitor.observe_block(&verified_block);
238
239        // Update own received rounds and peer accepted rounds from this verified block.
240        self.round_tracker
241            .write()
242            .update_from_verified_block(&ExtendedBlock {
243                block: verified_block.clone(),
244                excluded_ancestors: excluded_ancestors.clone(),
245            });
246
247        // Reject blocks when local commit index is lagging too far from quorum commit index,
248        // to avoid the memory overhead from suspended blocks.
249        //
250        // IMPORTANT: this must be done after observing votes from the block, otherwise
251        // observed quorum commit will no longer progress.
252        //
253        // Since the main issue with too many suspended blocks is memory usage not CPU,
254        // it is ok to reject after block verifications instead of before.
255        let last_commit_index = self.dag_state.read().last_commit_index();
256        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
257        // The threshold to ignore block should be larger than commit_sync_batch_size,
258        // to avoid excessive block rejections and synchronizations.
259        if last_commit_index
260            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
261            < quorum_commit_index
262        {
263            self.context
264                .metrics
265                .node_metrics
266                .rejected_blocks
267                .with_label_values(&["commit_lagging"])
268                .inc();
269            debug!(
270                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
271                block_ref, last_commit_index, quorum_commit_index,
272            );
273            return Err(ConsensusError::BlockRejected {
274                block_ref,
275                reason: format!(
276                    "Last commit index is lagging quorum commit index too much ({} < {})",
277                    last_commit_index, quorum_commit_index,
278                ),
279            });
280        }
281
282        // The block is verified and current, so record own votes on the block
283        // before sending the block to Core.
284        if self.context.protocol_config.transaction_voting_enabled() {
285            self.transaction_certifier
286                .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
287        }
288
289        // Send the block to Core to try accepting it into the DAG.
290        let missing_ancestors = self
291            .core_dispatcher
292            .add_blocks(vec![verified_block.clone()])
293            .await
294            .map_err(|_| ConsensusError::Shutdown)?;
295
296        // Schedule fetching missing ancestors from this peer in the background.
297        if !missing_ancestors.is_empty() {
298            self.context
299                .metrics
300                .node_metrics
301                .handler_received_block_missing_ancestors
302                .with_label_values(&[peer_hostname])
303                .inc_by(missing_ancestors.len() as u64);
304            let synchronizer = self.synchronizer.clone();
305            spawn_monitored_task!(async move {
306                // This does not wait for the fetch request to complete.
307                // It only waits for synchronizer to queue the request to a peer.
308                // When this fails, it usually means the queue is full.
309                // The fetch will retry from other peers via live and periodic syncs.
310                if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
311                    debug!("Failed to fetch missing ancestors via synchronizer: {err}");
312                }
313            });
314        }
315
316        // Schedule fetching missing soft links from this peer in the background.
317        let missing_excluded_ancestors = self
318            .core_dispatcher
319            .check_block_refs(excluded_ancestors)
320            .await
321            .map_err(|_| ConsensusError::Shutdown)?;
322        if !missing_excluded_ancestors.is_empty() {
323            self.context
324                .metrics
325                .node_metrics
326                .network_excluded_ancestors_sent_to_fetch
327                .with_label_values(&[peer_hostname])
328                .inc_by(missing_excluded_ancestors.len() as u64);
329
330            let synchronizer = self.synchronizer.clone();
331            spawn_monitored_task!(async move {
332                if let Err(err) = synchronizer
333                    .fetch_blocks(missing_excluded_ancestors, peer)
334                    .await
335                {
336                    debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
337                }
338            });
339        }
340
341        Ok(())
342    }
343
344    async fn handle_subscribe_blocks(
345        &self,
346        peer: AuthorityIndex,
347        last_received: Round,
348    ) -> ConsensusResult<BlockStream> {
349        fail_point_async!("consensus-rpc-response");
350
351        // Find past proposed blocks as the initial blocks to send to the peer.
352        //
353        // If there are cached blocks in the range which the peer requested, send all of them.
354        // The size is bounded by the local GC round and DagState cache size.
355        //
356        // Otherwise if there is no cached block in the range which the peer requested,
357        // and this node has proposed blocks before, at least one block should be sent to the peer
358        // to help with liveness.
359        let past_proposed_blocks = {
360            let dag_state = self.dag_state.read();
361
362            let mut proposed_blocks =
363                dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
364            if proposed_blocks.is_empty() {
365                let last_proposed_block = dag_state.get_last_proposed_block();
366                proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
367                    vec![last_proposed_block]
368                } else {
369                    vec![]
370                };
371            }
372            stream::iter(
373                proposed_blocks
374                    .into_iter()
375                    .map(|block| ExtendedSerializedBlock {
376                        block: block.serialized().clone(),
377                        excluded_ancestors: vec![],
378                    }),
379            )
380        };
381
382        let broadcasted_blocks = BroadcastedBlockStream::new(
383            peer,
384            self.rx_block_broadcast.resubscribe(),
385            self.subscription_counter.clone(),
386        );
387
388        // Return a stream of blocks that first yields missed blocks as requested, then new blocks.
389        Ok(Box::pin(past_proposed_blocks.chain(
390            broadcasted_blocks.map(ExtendedSerializedBlock::from),
391        )))
392    }
393
394    // Handles two types of requests:
395    // 1. Missing block for block sync:
396    //    - uses highest_accepted_rounds.
397    //    - max_blocks_per_sync blocks should be returned.
398    // 2. Committed block for commit sync:
399    //    - does not use highest_accepted_rounds.
400    //    - max_blocks_per_fetch blocks should be returned.
401    async fn handle_fetch_blocks(
402        &self,
403        _peer: AuthorityIndex,
404        mut block_refs: Vec<BlockRef>,
405        highest_accepted_rounds: Vec<Round>,
406        breadth_first: bool,
407    ) -> ConsensusResult<Vec<Bytes>> {
408        fail_point_async!("consensus-rpc-response");
409
410        if !highest_accepted_rounds.is_empty()
411            && highest_accepted_rounds.len() != self.context.committee.size()
412        {
413            return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
414                highest_accepted_rounds.len(),
415                self.context.committee.size(),
416            ));
417        }
418
419        // Some quick validation of the requested block refs
420        let max_response_num_blocks = if !highest_accepted_rounds.is_empty() {
421            self.context.parameters.max_blocks_per_sync
422        } else {
423            self.context.parameters.max_blocks_per_fetch
424        };
425        if block_refs.len() > max_response_num_blocks {
426            block_refs.truncate(max_response_num_blocks);
427        }
428
429        // Validate the requested block refs.
430        for block in &block_refs {
431            if !self.context.committee.is_valid_index(block.author) {
432                return Err(ConsensusError::InvalidAuthorityIndex {
433                    index: block.author,
434                    max: self.context.committee.size(),
435                });
436            }
437            if block.round == GENESIS_ROUND {
438                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
439            }
440        }
441
442        // Get requested blocks from store.
443        let blocks = if !highest_accepted_rounds.is_empty() {
444            block_refs.sort();
445            block_refs.dedup();
446            let mut blocks = self
447                .dag_state
448                .read()
449                .get_blocks(&block_refs)
450                .into_iter()
451                .flatten()
452                .collect::<Vec<_>>();
453
454            if breadth_first {
455                // Get unique missing ancestor blocks of the requested blocks.
456                let mut missing_ancestors = blocks
457                    .iter()
458                    .flat_map(|block| block.ancestors().to_vec())
459                    .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
460                    .collect::<BTreeSet<_>>()
461                    .into_iter()
462                    .collect::<Vec<_>>();
463
464                // If there are too many missing ancestors, randomly select a subset to avoid
465                // fetching duplicated blocks across peers.
466                let selected_num_blocks = max_response_num_blocks.saturating_sub(blocks.len());
467                if selected_num_blocks < missing_ancestors.len() {
468                    missing_ancestors = missing_ancestors
469                        .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
470                        .copied()
471                        .collect::<Vec<_>>();
472                }
473                let ancestor_blocks = self.dag_state.read().get_blocks(&missing_ancestors);
474                blocks.extend(ancestor_blocks.into_iter().flatten());
475            } else {
476                // Get additional blocks from authorities with missing block, if they are available in cache.
477                // Compute the lowest missing round per requested authority.
478                let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
479                for block_ref in blocks.iter().map(|b| b.reference()) {
480                    let entry = lowest_missing_rounds
481                        .entry(block_ref.author)
482                        .or_insert(block_ref.round);
483                    *entry = (*entry).min(block_ref.round);
484                }
485
486                // Retrieve additional blocks per authority, from peer's highest accepted round + 1 to
487                // lowest missing round (exclusive) per requested authority.
488                // No block from other authorities are retrieved. It is possible that the requestor is not
489                // seeing missing block from another authority, and serving a block would just lead to unnecessary
490                // data transfer. Or missing blocks from other authorities are requested from other peers.
491                let dag_state = self.dag_state.read();
492                for (authority, lowest_missing_round) in lowest_missing_rounds {
493                    let highest_accepted_round = highest_accepted_rounds[authority];
494                    if highest_accepted_round >= lowest_missing_round {
495                        continue;
496                    }
497                    let missing_blocks = dag_state.get_cached_blocks_in_range(
498                        authority,
499                        highest_accepted_round + 1,
500                        lowest_missing_round,
501                        self.context
502                            .parameters
503                            .max_blocks_per_sync
504                            .saturating_sub(blocks.len()),
505                    );
506                    blocks.extend(missing_blocks);
507                    if blocks.len() >= self.context.parameters.max_blocks_per_sync {
508                        blocks.truncate(self.context.parameters.max_blocks_per_sync);
509                        break;
510                    }
511                }
512            }
513
514            blocks
515        } else {
516            self.dag_state
517                .read()
518                .get_blocks(&block_refs)
519                .into_iter()
520                .flatten()
521                .collect()
522        };
523
524        // Return the serialized blocks
525        let bytes = blocks
526            .into_iter()
527            .map(|block| block.serialized().clone())
528            .collect::<Vec<_>>();
529        Ok(bytes)
530    }
531
532    async fn handle_fetch_commits(
533        &self,
534        _peer: AuthorityIndex,
535        commit_range: CommitRange,
536    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
537        fail_point_async!("consensus-rpc-response");
538
539        // Compute an inclusive end index and bound the maximum number of commits scanned.
540        let inclusive_end = commit_range.end().min(
541            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
542                - 1,
543        );
544        let mut commits = self
545            .store
546            .scan_commits((commit_range.start()..=inclusive_end).into())?;
547        let mut certifier_block_refs = vec![];
548        'commit: while let Some(c) = commits.last() {
549            let index = c.index();
550            let votes = self.store.read_commit_votes(index)?;
551            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
552            for v in &votes {
553                stake_aggregator.add(v.author, &self.context.committee);
554            }
555            if stake_aggregator.reached_threshold(&self.context.committee) {
556                certifier_block_refs = votes;
557                break 'commit;
558            } else {
559                debug!(
560                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
561                    index,
562                    stake_aggregator.stake(),
563                    stake_aggregator.threshold(&self.context.committee)
564                );
565                self.context
566                    .metrics
567                    .node_metrics
568                    .commit_sync_fetch_commits_handler_uncertified_skipped
569                    .inc();
570                commits.pop();
571            }
572        }
573        let certifier_blocks = self
574            .store
575            .read_blocks(&certifier_block_refs)?
576            .into_iter()
577            .flatten()
578            .collect();
579        Ok((commits, certifier_blocks))
580    }
581
582    async fn handle_fetch_latest_blocks(
583        &self,
584        peer: AuthorityIndex,
585        authorities: Vec<AuthorityIndex>,
586    ) -> ConsensusResult<Vec<Bytes>> {
587        fail_point_async!("consensus-rpc-response");
588
589        if authorities.len() > self.context.committee.size() {
590            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
591        }
592
593        // Ensure that those are valid authorities
594        for authority in &authorities {
595            if !self.context.committee.is_valid_index(*authority) {
596                return Err(ConsensusError::InvalidAuthorityIndex {
597                    index: *authority,
598                    max: self.context.committee.size(),
599                });
600            }
601        }
602
603        // Read from the dag state to find the latest blocks.
604        // TODO: at the moment we don't look into the block manager for suspended blocks. Ideally we
605        // want in the future if we think we would like to tackle the majority of cases.
606        let mut blocks = vec![];
607        let dag_state = self.dag_state.read();
608        for authority in authorities {
609            let block = dag_state.get_last_block_for_authority(authority);
610
611            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
612
613            // no reason to serve back the genesis block - it's equal as if it has not received any block
614            if block.round() != GENESIS_ROUND {
615                blocks.push(block);
616            }
617        }
618
619        // Return the serialised blocks
620        let result = blocks
621            .into_iter()
622            .map(|block| block.serialized().clone())
623            .collect::<Vec<_>>();
624
625        Ok(result)
626    }
627
628    async fn handle_get_latest_rounds(
629        &self,
630        _peer: AuthorityIndex,
631    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
632        fail_point_async!("consensus-rpc-response");
633
634        let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
635
636        let blocks = self
637            .dag_state
638            .read()
639            .get_last_cached_block_per_authority(Round::MAX);
640        let highest_accepted_rounds = blocks
641            .into_iter()
642            .map(|(block, _)| block.round())
643            .collect::<Vec<_>>();
644
645        Ok((highest_received_rounds, highest_accepted_rounds))
646    }
647}
648
649#[async_trait]
650impl<C: CoreThreadDispatcher> ObserverNetworkService for AuthorityService<C> {
651    async fn handle_stream_blocks(
652        &self,
653        _peer: NodeId,
654        _request_stream: BlockRequestStream,
655    ) -> ConsensusResult<ObserverBlockStream> {
656        // TODO: Implement observer block streaming
657        todo!("Observer block streaming not yet implemented")
658    }
659
660    async fn handle_fetch_blocks(
661        &self,
662        _peer: NodeId,
663        _block_refs: Vec<BlockRef>,
664    ) -> ConsensusResult<Vec<Bytes>> {
665        // TODO: implement observer fetch blocks, similar to validator fetch_blocks but
666        // without highest_accepted_rounds.
667        Err(ConsensusError::NetworkRequest(
668            "Observer fetch blocks not yet implemented".to_string(),
669        ))
670    }
671
672    async fn handle_fetch_commits(
673        &self,
674        _peer: NodeId,
675        _commit_range: CommitRange,
676    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
677        // TODO: implement observer fetch commits, similar to validator fetch_commits.
678        Err(ConsensusError::NetworkRequest(
679            "Observer fetch commits not yet implemented".to_string(),
680        ))
681    }
682}
683
684struct Counter {
685    count: usize,
686    subscriptions_by_authority: Vec<usize>,
687}
688
689/// Atomically counts the number of active subscriptions to the block broadcast stream.
690struct SubscriptionCounter {
691    context: Arc<Context>,
692    counter: parking_lot::Mutex<Counter>,
693}
694
695impl SubscriptionCounter {
696    fn new(context: Arc<Context>) -> Self {
697        // Set the subscribed peers by default to 0
698        for (_, authority) in context.committee.authorities() {
699            context
700                .metrics
701                .node_metrics
702                .subscribed_by
703                .with_label_values(&[authority.hostname.as_str()])
704                .set(0);
705        }
706
707        Self {
708            counter: parking_lot::Mutex::new(Counter {
709                count: 0,
710                subscriptions_by_authority: vec![0; context.committee.size()],
711            }),
712            context,
713        }
714    }
715
716    fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
717        let mut counter = self.counter.lock();
718        counter.count += 1;
719        counter.subscriptions_by_authority[peer] += 1;
720
721        let peer_hostname = &self.context.committee.authority(peer).hostname;
722        self.context
723            .metrics
724            .node_metrics
725            .subscribed_by
726            .with_label_values(&[peer_hostname])
727            .set(1);
728
729        Ok(())
730    }
731
732    fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
733        let mut counter = self.counter.lock();
734        counter.count -= 1;
735        counter.subscriptions_by_authority[peer] -= 1;
736
737        if counter.subscriptions_by_authority[peer] == 0 {
738            let peer_hostname = &self.context.committee.authority(peer).hostname;
739            self.context
740                .metrics
741                .node_metrics
742                .subscribed_by
743                .with_label_values(&[peer_hostname])
744                .set(0);
745        }
746
747        Ok(())
748    }
749}
750
751/// Each broadcasted block stream wraps a broadcast receiver for blocks.
752/// It yields blocks that are broadcasted after the stream is created.
753type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
754
755/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
756/// this tolerates lags with only logging, without yielding errors.
757struct BroadcastStream<T> {
758    peer: AuthorityIndex,
759    // Stores the receiver across poll_next() calls.
760    inner: ReusableBoxFuture<
761        'static,
762        (
763            Result<T, broadcast::error::RecvError>,
764            broadcast::Receiver<T>,
765        ),
766    >,
767    // Counts total subscriptions / active BroadcastStreams.
768    subscription_counter: Arc<SubscriptionCounter>,
769}
770
771impl<T: 'static + Clone + Send> BroadcastStream<T> {
772    pub fn new(
773        peer: AuthorityIndex,
774        rx: broadcast::Receiver<T>,
775        subscription_counter: Arc<SubscriptionCounter>,
776    ) -> Self {
777        if let Err(err) = subscription_counter.increment(peer) {
778            match err {
779                ConsensusError::Shutdown => {}
780                _ => panic!("Unexpected error: {err}"),
781            }
782        }
783        Self {
784            peer,
785            inner: ReusableBoxFuture::new(make_recv_future(rx)),
786            subscription_counter,
787        }
788    }
789}
790
791impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
792    type Item = T;
793
794    fn poll_next(
795        mut self: Pin<&mut Self>,
796        cx: &mut task::Context<'_>,
797    ) -> task::Poll<Option<Self::Item>> {
798        let peer = self.peer;
799        let maybe_item = loop {
800            let (result, rx) = ready!(self.inner.poll(cx));
801            self.inner.set(make_recv_future(rx));
802
803            match result {
804                Ok(item) => break Some(item),
805                Err(broadcast::error::RecvError::Closed) => {
806                    info!("Block BroadcastedBlockStream {} closed", peer);
807                    break None;
808                }
809                Err(broadcast::error::RecvError::Lagged(n)) => {
810                    warn!(
811                        "Block BroadcastedBlockStream {} lagged by {} messages",
812                        peer, n
813                    );
814                    continue;
815                }
816            }
817        };
818        task::Poll::Ready(maybe_item)
819    }
820}
821
822impl<T> Drop for BroadcastStream<T> {
823    fn drop(&mut self) {
824        if let Err(err) = self.subscription_counter.decrement(self.peer) {
825            match err {
826                ConsensusError::Shutdown => {}
827                _ => panic!("Unexpected error: {err}"),
828            }
829        }
830    }
831}
832
833async fn make_recv_future<T: Clone>(
834    mut rx: broadcast::Receiver<T>,
835) -> (
836    Result<T, broadcast::error::RecvError>,
837    broadcast::Receiver<T>,
838) {
839    let result = rx.recv().await;
840    (result, rx)
841}
842
843// TODO: add a unit test for BroadcastStream.
844
845#[cfg(test)]
846mod tests {
847    use std::{
848        collections::{BTreeMap, BTreeSet},
849        sync::Arc,
850        time::Duration,
851    };
852
853    use async_trait::async_trait;
854    use bytes::Bytes;
855    use consensus_config::AuthorityIndex;
856    use consensus_types::block::{BlockDigest, BlockRef, Round};
857    use mysten_metrics::monitored_mpsc;
858    use parking_lot::{Mutex, RwLock};
859    use tokio::{sync::broadcast, time::sleep};
860
861    use futures::StreamExt as _;
862
863    use crate::{
864        authority_service::AuthorityService,
865        block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
866        commit::{CertifiedCommits, CommitRange},
867        commit_vote_monitor::CommitVoteMonitor,
868        context::Context,
869        core_thread::{CoreError, CoreThreadDispatcher},
870        dag_state::DagState,
871        error::ConsensusResult,
872        network::{
873            BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
874            ValidatorNetworkClient, ValidatorNetworkService,
875        },
876        round_tracker::RoundTracker,
877        storage::mem_store::MemStore,
878        synchronizer::Synchronizer,
879        test_dag_builder::DagBuilder,
880        transaction_certifier::TransactionCertifier,
881    };
882    struct FakeCoreThreadDispatcher {
883        blocks: Mutex<Vec<VerifiedBlock>>,
884    }
885
886    impl FakeCoreThreadDispatcher {
887        fn new() -> Self {
888            Self {
889                blocks: Mutex::new(vec![]),
890            }
891        }
892
893        fn get_blocks(&self) -> Vec<VerifiedBlock> {
894            self.blocks.lock().clone()
895        }
896    }
897
898    #[async_trait]
899    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
900        async fn add_blocks(
901            &self,
902            blocks: Vec<VerifiedBlock>,
903        ) -> Result<BTreeSet<BlockRef>, CoreError> {
904            let block_refs = blocks.iter().map(|b| b.reference()).collect();
905            self.blocks.lock().extend(blocks);
906            Ok(block_refs)
907        }
908
909        async fn check_block_refs(
910            &self,
911            _block_refs: Vec<BlockRef>,
912        ) -> Result<BTreeSet<BlockRef>, CoreError> {
913            Ok(BTreeSet::new())
914        }
915
916        async fn add_certified_commits(
917            &self,
918            _commits: CertifiedCommits,
919        ) -> Result<BTreeSet<BlockRef>, CoreError> {
920            todo!()
921        }
922
923        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
924            Ok(())
925        }
926
927        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
928            Ok(Default::default())
929        }
930
931        fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
932            todo!()
933        }
934
935        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
936            todo!()
937        }
938    }
939
940    #[derive(Default)]
941    struct FakeNetworkClient {}
942
943    #[async_trait]
944    impl ValidatorNetworkClient for FakeNetworkClient {
945        async fn send_block(
946            &self,
947            _peer: AuthorityIndex,
948            _block: &VerifiedBlock,
949            _timeout: Duration,
950        ) -> ConsensusResult<()> {
951            unimplemented!("Unimplemented")
952        }
953
954        async fn subscribe_blocks(
955            &self,
956            _peer: AuthorityIndex,
957            _last_received: Round,
958            _timeout: Duration,
959        ) -> ConsensusResult<BlockStream> {
960            unimplemented!("Unimplemented")
961        }
962
963        async fn fetch_blocks(
964            &self,
965            _peer: AuthorityIndex,
966            _block_refs: Vec<BlockRef>,
967            _highest_accepted_rounds: Vec<Round>,
968            _breadth_first: bool,
969            _timeout: Duration,
970        ) -> ConsensusResult<Vec<Bytes>> {
971            unimplemented!("Unimplemented")
972        }
973
974        async fn fetch_commits(
975            &self,
976            _peer: AuthorityIndex,
977            _commit_range: CommitRange,
978            _timeout: Duration,
979        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
980            unimplemented!("Unimplemented")
981        }
982
983        async fn fetch_latest_blocks(
984            &self,
985            _peer: AuthorityIndex,
986            _authorities: Vec<AuthorityIndex>,
987            _timeout: Duration,
988        ) -> ConsensusResult<Vec<Bytes>> {
989            unimplemented!("Unimplemented")
990        }
991
992        async fn get_latest_rounds(
993            &self,
994            _peer: AuthorityIndex,
995            _timeout: Duration,
996        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
997            unimplemented!("Unimplemented")
998        }
999    }
1000
1001    #[async_trait]
1002    impl ObserverNetworkClient for FakeNetworkClient {
1003        async fn stream_blocks(
1004            &self,
1005            _peer: crate::network::PeerId,
1006            _request_stream: crate::network::BlockRequestStream,
1007            _timeout: Duration,
1008        ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1009            unimplemented!("Unimplemented")
1010        }
1011
1012        async fn fetch_blocks(
1013            &self,
1014            _peer: crate::network::PeerId,
1015            _block_refs: Vec<BlockRef>,
1016            _timeout: Duration,
1017        ) -> ConsensusResult<Vec<Bytes>> {
1018            unimplemented!("Unimplemented")
1019        }
1020
1021        async fn fetch_commits(
1022            &self,
1023            _peer: crate::network::PeerId,
1024            _commit_range: CommitRange,
1025            _timeout: Duration,
1026        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1027            unimplemented!("Unimplemented")
1028        }
1029    }
1030
1031    #[tokio::test(flavor = "current_thread", start_paused = true)]
1032    async fn test_handle_send_block() {
1033        let (context, _keys) = Context::new_for_test(4);
1034        let context = Arc::new(context);
1035        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1036        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1037        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1038        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1039        let fake_client = Arc::new(FakeNetworkClient::default());
1040        let network_client = Arc::new(SynchronizerClient::new(
1041            context.clone(),
1042            Some(fake_client.clone()),
1043            Some(fake_client.clone()),
1044        ));
1045        let (blocks_sender, _blocks_receiver) =
1046            monitored_mpsc::unbounded_channel("consensus_block_output");
1047        let store = Arc::new(MemStore::new());
1048        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1049        let transaction_certifier = TransactionCertifier::new(
1050            context.clone(),
1051            block_verifier.clone(),
1052            dag_state.clone(),
1053            blocks_sender,
1054        );
1055        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1056        let synchronizer = Synchronizer::start(
1057            network_client,
1058            context.clone(),
1059            core_dispatcher.clone(),
1060            commit_vote_monitor.clone(),
1061            block_verifier.clone(),
1062            transaction_certifier.clone(),
1063            round_tracker.clone(),
1064            dag_state.clone(),
1065            false,
1066        );
1067        let authority_service = Arc::new(AuthorityService::new(
1068            context.clone(),
1069            block_verifier,
1070            commit_vote_monitor,
1071            round_tracker,
1072            synchronizer,
1073            core_dispatcher.clone(),
1074            rx_block_broadcast,
1075            transaction_certifier,
1076            dag_state,
1077            store,
1078        ));
1079
1080        // Test delaying blocks with time drift.
1081        let now = context.clock.timestamp_utc_ms();
1082        let max_drift = context.parameters.max_forward_time_drift;
1083        let input_block = VerifiedBlock::new_for_test(
1084            TestBlock::new(9, 0)
1085                .set_timestamp_ms(now + max_drift.as_millis() as u64)
1086                .build(),
1087        );
1088
1089        let service = authority_service.clone();
1090        let serialized = ExtendedSerializedBlock {
1091            block: input_block.serialized().clone(),
1092            excluded_ancestors: vec![],
1093        };
1094
1095        tokio::spawn({
1096            let service = service.clone();
1097            let context = context.clone();
1098            async move {
1099                service
1100                    .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1101                    .await
1102                    .unwrap();
1103            }
1104        });
1105
1106        sleep(max_drift / 2).await;
1107
1108        let blocks = core_dispatcher.get_blocks();
1109        assert_eq!(blocks.len(), 1);
1110        assert_eq!(blocks[0], input_block);
1111
1112        // Test invalid block.
1113        let invalid_block =
1114            VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1115        let extended_block = ExtendedSerializedBlock {
1116            block: invalid_block.serialized().clone(),
1117            excluded_ancestors: vec![],
1118        };
1119        service
1120            .handle_send_block(
1121                context.committee.to_authority_index(0).unwrap(),
1122                extended_block,
1123            )
1124            .await
1125            .unwrap_err();
1126
1127        // Test invalid excluded ancestors.
1128        let invalid_excluded_ancestors = vec![
1129            bcs::to_bytes(&BlockRef::new(
1130                10,
1131                AuthorityIndex::new_for_test(1000),
1132                BlockDigest::MIN,
1133            ))
1134            .unwrap(),
1135            vec![3u8; 40],
1136            bcs::to_bytes(&invalid_block.reference()).unwrap(),
1137        ];
1138        let extended_block = ExtendedSerializedBlock {
1139            block: input_block.serialized().clone(),
1140            excluded_ancestors: invalid_excluded_ancestors,
1141        };
1142        service
1143            .handle_send_block(
1144                context.committee.to_authority_index(0).unwrap(),
1145                extended_block,
1146            )
1147            .await
1148            .unwrap_err();
1149    }
1150
1151    #[tokio::test(flavor = "current_thread", start_paused = true)]
1152    async fn test_handle_fetch_blocks() {
1153        // GIVEN
1154        // Use NUM_AUTHORITIES and NUM_ROUNDS higher than max_blocks_per_sync to test limits.
1155        const NUM_AUTHORITIES: usize = 40;
1156        const NUM_ROUNDS: usize = 40;
1157        let (context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1158        let context = Arc::new(context);
1159        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1160        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1161        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1162        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1163        let fake_client = Arc::new(FakeNetworkClient::default());
1164        let network_client = Arc::new(SynchronizerClient::new(
1165            context.clone(),
1166            Some(fake_client.clone()),
1167            Some(fake_client.clone()),
1168        ));
1169        let (blocks_sender, _blocks_receiver) =
1170            monitored_mpsc::unbounded_channel("consensus_block_output");
1171        let store = Arc::new(MemStore::new());
1172        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1173        let transaction_certifier = TransactionCertifier::new(
1174            context.clone(),
1175            block_verifier.clone(),
1176            dag_state.clone(),
1177            blocks_sender,
1178        );
1179        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1180        let synchronizer = Synchronizer::start(
1181            network_client,
1182            context.clone(),
1183            core_dispatcher.clone(),
1184            commit_vote_monitor.clone(),
1185            block_verifier.clone(),
1186            transaction_certifier.clone(),
1187            round_tracker.clone(),
1188            dag_state.clone(),
1189            false,
1190        );
1191        let authority_service = Arc::new(AuthorityService::new(
1192            context.clone(),
1193            block_verifier,
1194            commit_vote_monitor,
1195            round_tracker,
1196            synchronizer,
1197            core_dispatcher.clone(),
1198            rx_block_broadcast,
1199            transaction_certifier,
1200            dag_state.clone(),
1201            store,
1202        ));
1203
1204        // GIVEN: 40 rounds of blocks in the dag state.
1205        let mut dag_builder = DagBuilder::new(context.clone());
1206        dag_builder
1207            .layers(1..=(NUM_ROUNDS as u32))
1208            .build()
1209            .persist_layers(dag_state.clone());
1210        let all_blocks = dag_builder.all_blocks();
1211
1212        // WHEN: Request 2 blocks from round 40, get ancestors breadth first.
1213        let missing_block_refs: Vec<BlockRef> = all_blocks
1214            .iter()
1215            .rev()
1216            .take(2)
1217            .map(|b| b.reference())
1218            .collect();
1219        let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1220        let results = authority_service
1221            .handle_fetch_blocks(
1222                AuthorityIndex::new_for_test(0),
1223                missing_block_refs.clone(),
1224                highest_accepted_rounds,
1225                true,
1226            )
1227            .await
1228            .unwrap();
1229
1230        // THEN: the expected number of unique blocks are returned.
1231        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1232            .iter()
1233            .map(|b| {
1234                let signed = bcs::from_bytes(b).unwrap();
1235                let block = VerifiedBlock::new_verified(signed, b.clone());
1236                (block.reference(), block)
1237            })
1238            .collect();
1239        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1240        // All missing blocks are returned.
1241        for b in &missing_block_refs {
1242            assert!(blocks.contains_key(b));
1243        }
1244        let num_missing_ancestors = blocks
1245            .keys()
1246            .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1247            .count();
1248        assert_eq!(
1249            num_missing_ancestors,
1250            context.parameters.max_blocks_per_sync - missing_block_refs.len()
1251        );
1252
1253        // WHEN: Request 2 blocks from round 37, get ancestors depth first.
1254        let missing_round = NUM_ROUNDS as Round - 3;
1255        let missing_block_refs: Vec<BlockRef> = all_blocks
1256            .iter()
1257            .filter(|b| b.reference().round == missing_round)
1258            .map(|b| b.reference())
1259            .take(2)
1260            .collect();
1261        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1262        // Try to fill up the blocks from the 1st authority in missing_block_refs.
1263        highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1264        let results = authority_service
1265            .handle_fetch_blocks(
1266                AuthorityIndex::new_for_test(0),
1267                missing_block_refs.clone(),
1268                highest_accepted_rounds,
1269                false,
1270            )
1271            .await
1272            .unwrap();
1273
1274        // THEN: the expected number of unique blocks are returned.
1275        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1276            .iter()
1277            .map(|b| {
1278                let signed = bcs::from_bytes(b).unwrap();
1279                let block = VerifiedBlock::new_verified(signed, b.clone());
1280                (block.reference(), block)
1281            })
1282            .collect();
1283        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1284        // All missing blocks are returned.
1285        for b in &missing_block_refs {
1286            assert!(blocks.contains_key(b));
1287        }
1288        // Ancestor blocks are from the expected rounds and authorities.
1289        let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1290        for b in blocks.keys() {
1291            assert!(b.round <= missing_round);
1292            assert!(expected_authors.contains(&b.author));
1293        }
1294
1295        // WHEN: Request 5 block from round 40, not getting ancestors.
1296        let missing_block_refs: Vec<BlockRef> = all_blocks
1297            .iter()
1298            .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1299            .map(|b| b.reference())
1300            .take(5)
1301            .collect();
1302        let results = authority_service
1303            .handle_fetch_blocks(
1304                AuthorityIndex::new_for_test(0),
1305                missing_block_refs.clone(),
1306                vec![],
1307                false,
1308            )
1309            .await
1310            .unwrap();
1311
1312        // THEN: the expected number of unique blocks are returned.
1313        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1314            .iter()
1315            .map(|b| {
1316                let signed = bcs::from_bytes(b).unwrap();
1317                let block = VerifiedBlock::new_verified(signed, b.clone());
1318                (block.reference(), block)
1319            })
1320            .collect();
1321        assert_eq!(blocks.len(), 5);
1322        for b in &missing_block_refs {
1323            assert!(blocks.contains_key(b));
1324        }
1325    }
1326
1327    #[tokio::test(flavor = "current_thread", start_paused = true)]
1328    async fn test_handle_fetch_latest_blocks() {
1329        // GIVEN
1330        let (context, _keys) = Context::new_for_test(4);
1331        let context = Arc::new(context);
1332        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1333        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1334        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1335        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1336        let fake_client = Arc::new(FakeNetworkClient::default());
1337        let network_client = Arc::new(SynchronizerClient::new(
1338            context.clone(),
1339            Some(fake_client.clone()),
1340            Some(fake_client.clone()),
1341        ));
1342        let (blocks_sender, _blocks_receiver) =
1343            monitored_mpsc::unbounded_channel("consensus_block_output");
1344        let store = Arc::new(MemStore::new());
1345        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1346        let transaction_certifier = TransactionCertifier::new(
1347            context.clone(),
1348            block_verifier.clone(),
1349            dag_state.clone(),
1350            blocks_sender,
1351        );
1352        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1353        let synchronizer = Synchronizer::start(
1354            network_client,
1355            context.clone(),
1356            core_dispatcher.clone(),
1357            commit_vote_monitor.clone(),
1358            block_verifier.clone(),
1359            transaction_certifier.clone(),
1360            round_tracker.clone(),
1361            dag_state.clone(),
1362            true,
1363        );
1364        let authority_service = Arc::new(AuthorityService::new(
1365            context.clone(),
1366            block_verifier,
1367            commit_vote_monitor,
1368            round_tracker,
1369            synchronizer,
1370            core_dispatcher.clone(),
1371            rx_block_broadcast,
1372            transaction_certifier,
1373            dag_state.clone(),
1374            store,
1375        ));
1376
1377        // Create some blocks for a few authorities. Create some equivocations as well and store in dag state.
1378        let mut dag_builder = DagBuilder::new(context.clone());
1379        dag_builder
1380            .layers(1..=10)
1381            .authorities(vec![AuthorityIndex::new_for_test(2)])
1382            .equivocate(1)
1383            .build()
1384            .persist_layers(dag_state);
1385
1386        // WHEN
1387        let authorities_to_request = vec![
1388            AuthorityIndex::new_for_test(1),
1389            AuthorityIndex::new_for_test(2),
1390        ];
1391        let results = authority_service
1392            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1393            .await;
1394
1395        // THEN
1396        let serialised_blocks = results.unwrap();
1397        for serialised_block in serialised_blocks {
1398            let signed_block: SignedBlock =
1399                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1400            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1401
1402            assert_eq!(verified_block.round(), 10);
1403        }
1404    }
1405
1406    #[tokio::test(flavor = "current_thread", start_paused = true)]
1407    async fn test_handle_subscribe_blocks() {
1408        let (context, _keys) = Context::new_for_test(4);
1409        let context = Arc::new(context);
1410        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1411        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1412        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1413        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1414        let fake_client = Arc::new(FakeNetworkClient::default());
1415        let network_client = Arc::new(SynchronizerClient::new(
1416            context.clone(),
1417            Some(fake_client.clone()),
1418            Some(fake_client.clone()),
1419        ));
1420        let (blocks_sender, _blocks_receiver) =
1421            monitored_mpsc::unbounded_channel("consensus_block_output");
1422        let store = Arc::new(MemStore::new());
1423        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1424        let transaction_certifier = TransactionCertifier::new(
1425            context.clone(),
1426            block_verifier.clone(),
1427            dag_state.clone(),
1428            blocks_sender,
1429        );
1430        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1431        let synchronizer = Synchronizer::start(
1432            network_client,
1433            context.clone(),
1434            core_dispatcher.clone(),
1435            commit_vote_monitor.clone(),
1436            block_verifier.clone(),
1437            transaction_certifier.clone(),
1438            round_tracker.clone(),
1439            dag_state.clone(),
1440            false,
1441        );
1442
1443        // Create 3 proposed blocks at rounds 5, 10, 15 for own authority (index 0)
1444        dag_state
1445            .write()
1446            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1447        dag_state
1448            .write()
1449            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1450        dag_state
1451            .write()
1452            .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1453
1454        let authority_service = Arc::new(AuthorityService::new(
1455            context.clone(),
1456            block_verifier,
1457            commit_vote_monitor,
1458            round_tracker,
1459            synchronizer,
1460            core_dispatcher.clone(),
1461            rx_block_broadcast,
1462            transaction_certifier,
1463            dag_state.clone(),
1464            store,
1465        ));
1466
1467        let peer = context.committee.to_authority_index(1).unwrap();
1468
1469        // Case A: Subscribe with last_received = 100 (after all proposed blocks)
1470        // Should return last proposed block (round 15) as fallback
1471        {
1472            let mut stream = authority_service
1473                .handle_subscribe_blocks(peer, 100)
1474                .await
1475                .unwrap();
1476            let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1477            assert_eq!(
1478                block.round(),
1479                15,
1480                "Should return last proposed block as fallback"
1481            );
1482            assert_eq!(block.author().value(), 0);
1483        }
1484
1485        // Case B: Subscribe with last_received = 7 (includes rounds 10, 15)
1486        // Should return cached blocks from round 8+
1487        {
1488            let mut stream = authority_service
1489                .handle_subscribe_blocks(peer, 7)
1490                .await
1491                .unwrap();
1492
1493            let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1494            assert_eq!(block1.round(), 10, "Should return block at round 10");
1495
1496            let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1497            assert_eq!(block2.round(), 15, "Should return block at round 15");
1498        }
1499    }
1500
1501    #[tokio::test(flavor = "current_thread", start_paused = true)]
1502    async fn test_handle_subscribe_blocks_not_proposed() {
1503        let (context, _keys) = Context::new_for_test(4);
1504        let context = Arc::new(context);
1505        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1506        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1507        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1508        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1509        let fake_client = Arc::new(FakeNetworkClient::default());
1510        let network_client = Arc::new(SynchronizerClient::new(
1511            context.clone(),
1512            Some(fake_client.clone()),
1513            Some(fake_client.clone()),
1514        ));
1515        let (blocks_sender, _blocks_receiver) =
1516            monitored_mpsc::unbounded_channel("consensus_block_output");
1517        let store = Arc::new(MemStore::new());
1518        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1519        let transaction_certifier = TransactionCertifier::new(
1520            context.clone(),
1521            block_verifier.clone(),
1522            dag_state.clone(),
1523            blocks_sender,
1524        );
1525        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1526        let synchronizer = Synchronizer::start(
1527            network_client,
1528            context.clone(),
1529            core_dispatcher.clone(),
1530            commit_vote_monitor.clone(),
1531            block_verifier.clone(),
1532            transaction_certifier.clone(),
1533            round_tracker.clone(),
1534            dag_state.clone(),
1535            false,
1536        );
1537
1538        // No blocks added to DagState - only genesis exists
1539
1540        let authority_service = Arc::new(AuthorityService::new(
1541            context.clone(),
1542            block_verifier,
1543            commit_vote_monitor,
1544            round_tracker,
1545            synchronizer,
1546            core_dispatcher.clone(),
1547            rx_block_broadcast,
1548            transaction_certifier,
1549            dag_state.clone(),
1550            store,
1551        ));
1552
1553        let peer = context.committee.to_authority_index(1).unwrap();
1554
1555        // Subscribe - no blocks have been proposed yet (only genesis exists)
1556        let mut stream = authority_service
1557            .handle_subscribe_blocks(peer, 0)
1558            .await
1559            .unwrap();
1560
1561        // Should NOT receive any block (genesis must not be returned)
1562        use futures::poll;
1563        use std::task::Poll;
1564        let poll_result = poll!(stream.next());
1565        assert!(
1566            matches!(poll_result, Poll::Pending),
1567            "Should not receive genesis block on subscription stream"
1568        );
1569    }
1570}