consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    pin::Pin,
7    sync::Arc,
8    time::Duration,
9};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use consensus_types::block::{BlockRef, Round};
15use futures::{Stream, StreamExt, ready, stream, task};
16use mysten_metrics::spawn_monitored_task;
17use parking_lot::RwLock;
18use rand::seq::SliceRandom as _;
19use sui_macros::fail_point_async;
20use tap::TapFallible;
21use tokio::sync::broadcast;
22use tokio_util::sync::ReusableBoxFuture;
23use tracing::{debug, info, warn};
24
25use crate::{
26    CommitIndex,
27    block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
28    block_verifier::BlockVerifier,
29    commit::{CommitAPI as _, CommitRange, TrustedCommit},
30    commit_vote_monitor::CommitVoteMonitor,
31    context::Context,
32    core_thread::CoreThreadDispatcher,
33    dag_state::DagState,
34    error::{ConsensusError, ConsensusResult},
35    network::{BlockStream, ExtendedSerializedBlock, NetworkService},
36    round_tracker::PeerRoundTracker,
37    stake_aggregator::{QuorumThreshold, StakeAggregator},
38    storage::Store,
39    synchronizer::SynchronizerHandle,
40    transaction_certifier::TransactionCertifier,
41};
42
43pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
44
45/// Authority's network service implementation, agnostic to the actual networking stack used.
46pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
47    context: Arc<Context>,
48    commit_vote_monitor: Arc<CommitVoteMonitor>,
49    block_verifier: Arc<dyn BlockVerifier>,
50    synchronizer: Arc<SynchronizerHandle>,
51    core_dispatcher: Arc<C>,
52    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
53    subscription_counter: Arc<SubscriptionCounter>,
54    transaction_certifier: TransactionCertifier,
55    dag_state: Arc<RwLock<DagState>>,
56    store: Arc<dyn Store>,
57    round_tracker: Arc<RwLock<PeerRoundTracker>>,
58}
59
60impl<C: CoreThreadDispatcher> AuthorityService<C> {
61    pub(crate) fn new(
62        context: Arc<Context>,
63        block_verifier: Arc<dyn BlockVerifier>,
64        commit_vote_monitor: Arc<CommitVoteMonitor>,
65        round_tracker: Arc<RwLock<PeerRoundTracker>>,
66        synchronizer: Arc<SynchronizerHandle>,
67        core_dispatcher: Arc<C>,
68        rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
69        transaction_certifier: TransactionCertifier,
70        dag_state: Arc<RwLock<DagState>>,
71        store: Arc<dyn Store>,
72    ) -> Self {
73        let subscription_counter = Arc::new(SubscriptionCounter::new(
74            context.clone(),
75            core_dispatcher.clone(),
76        ));
77        Self {
78            context,
79            block_verifier,
80            commit_vote_monitor,
81            synchronizer,
82            core_dispatcher,
83            rx_block_broadcast,
84            subscription_counter,
85            transaction_certifier,
86            dag_state,
87            store,
88            round_tracker,
89        }
90    }
91}
92
93#[async_trait]
94impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
95    async fn handle_send_block(
96        &self,
97        peer: AuthorityIndex,
98        serialized_block: ExtendedSerializedBlock,
99    ) -> ConsensusResult<()> {
100        fail_point_async!("consensus-rpc-response");
101
102        let peer_hostname = &self.context.committee.authority(peer).hostname;
103
104        // TODO: dedup block verifications, here and with fetched blocks.
105        let signed_block: SignedBlock =
106            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
107
108        // Reject blocks not produced by the peer.
109        if peer != signed_block.author() {
110            self.context
111                .metrics
112                .node_metrics
113                .invalid_blocks
114                .with_label_values(&[peer_hostname, "handle_send_block", "UnexpectedAuthority"])
115                .inc();
116            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
117            info!("Block with wrong authority from {}: {}", peer, e);
118            return Err(e);
119        }
120        let peer_hostname = &self.context.committee.authority(peer).hostname;
121
122        // Reject blocks failing validations.
123        let (verified_block, reject_txn_votes) = self
124            .block_verifier
125            .verify_and_vote(signed_block, serialized_block.block)
126            .tap_err(|e| {
127                self.context
128                    .metrics
129                    .node_metrics
130                    .invalid_blocks
131                    .with_label_values(&[peer_hostname, "handle_send_block", e.name()])
132                    .inc();
133                info!("Invalid block from {}: {}", peer, e);
134            })?;
135        let block_ref = verified_block.reference();
136        debug!("Received block {} via send block.", block_ref);
137
138        let now = self.context.clock.timestamp_utc_ms();
139        let forward_time_drift =
140            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
141
142        self.context
143            .metrics
144            .node_metrics
145            .block_timestamp_drift_ms
146            .with_label_values(&[peer_hostname, "handle_send_block"])
147            .inc_by(forward_time_drift.as_millis() as u64);
148
149        // Observe the block for the commit votes. When local commit is lagging too much,
150        // commit sync loop will trigger fetching.
151        self.commit_vote_monitor.observe_block(&verified_block);
152
153        // Reject blocks when local commit index is lagging too far from quorum commit index,
154        // to avoid the memory overhead from suspended blocks.
155        //
156        // IMPORTANT: this must be done after observing votes from the block, otherwise
157        // observed quorum commit will no longer progress.
158        //
159        // Since the main issue with too many suspended blocks is memory usage not CPU,
160        // it is ok to reject after block verifications instead of before.
161        let last_commit_index = self.dag_state.read().last_commit_index();
162        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
163        // The threshold to ignore block should be larger than commit_sync_batch_size,
164        // to avoid excessive block rejections and synchronizations.
165        if last_commit_index
166            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
167            < quorum_commit_index
168        {
169            self.context
170                .metrics
171                .node_metrics
172                .rejected_blocks
173                .with_label_values(&["commit_lagging"])
174                .inc();
175            debug!(
176                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
177                block_ref, last_commit_index, quorum_commit_index,
178            );
179            return Err(ConsensusError::BlockRejected {
180                block_ref,
181                reason: format!(
182                    "Last commit index is lagging quorum commit index too much ({} < {})",
183                    last_commit_index, quorum_commit_index,
184                ),
185            });
186        }
187
188        self.context
189            .metrics
190            .node_metrics
191            .verified_blocks
192            .with_label_values(&[peer_hostname])
193            .inc();
194
195        // The block is verified and current, so it can be processed in the fastpath.
196        if self.context.protocol_config.mysticeti_fastpath() {
197            self.transaction_certifier
198                .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
199        }
200
201        // Try to accept the block into the DAG.
202        let missing_ancestors = self
203            .core_dispatcher
204            .add_blocks(vec![verified_block.clone()])
205            .await
206            .map_err(|_| ConsensusError::Shutdown)?;
207
208        // Schedule fetching missing ancestors from this peer in the background.
209        if !missing_ancestors.is_empty() {
210            self.context
211                .metrics
212                .node_metrics
213                .handler_received_block_missing_ancestors
214                .with_label_values(&[peer_hostname])
215                .inc_by(missing_ancestors.len() as u64);
216            let synchronizer = self.synchronizer.clone();
217            spawn_monitored_task!(async move {
218                // This does not wait for the fetch request to complete.
219                // It only waits for synchronizer to queue the request to a peer.
220                // When this fails, it usually means the queue is full.
221                // The fetch will retry from other peers via live and periodic syncs.
222                if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
223                    debug!("Failed to fetch missing ancestors via synchronizer: {err}");
224                }
225            });
226        }
227
228        // ------------ After processing the block, process the excluded ancestors ------------
229
230        let mut excluded_ancestors = serialized_block
231            .excluded_ancestors
232            .into_iter()
233            .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
234            .collect::<Result<Vec<BlockRef>, bcs::Error>>()
235            .map_err(ConsensusError::MalformedBlock)?;
236
237        let excluded_ancestors_limit = self.context.committee.size() * 2;
238        if excluded_ancestors.len() > excluded_ancestors_limit {
239            debug!(
240                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
241                excluded_ancestors.len() - excluded_ancestors_limit,
242                peer,
243                peer_hostname,
244            );
245            excluded_ancestors.truncate(excluded_ancestors_limit);
246        }
247
248        self.round_tracker
249            .write()
250            .update_from_accepted_block(&ExtendedBlock {
251                block: verified_block,
252                excluded_ancestors: excluded_ancestors.clone(),
253            });
254
255        self.context
256            .metrics
257            .node_metrics
258            .network_received_excluded_ancestors_from_authority
259            .with_label_values(&[peer_hostname])
260            .inc_by(excluded_ancestors.len() as u64);
261
262        for excluded_ancestor in &excluded_ancestors {
263            let excluded_ancestor_hostname = &self
264                .context
265                .committee
266                .authority(excluded_ancestor.author)
267                .hostname;
268            self.context
269                .metrics
270                .node_metrics
271                .network_excluded_ancestors_count_by_authority
272                .with_label_values(&[excluded_ancestor_hostname])
273                .inc();
274        }
275
276        let missing_excluded_ancestors = self
277            .core_dispatcher
278            .check_block_refs(excluded_ancestors)
279            .await
280            .map_err(|_| ConsensusError::Shutdown)?;
281
282        // Schedule fetching missing soft links from this peer in the background.
283        if !missing_excluded_ancestors.is_empty() {
284            self.context
285                .metrics
286                .node_metrics
287                .network_excluded_ancestors_sent_to_fetch
288                .with_label_values(&[peer_hostname])
289                .inc_by(missing_excluded_ancestors.len() as u64);
290
291            let synchronizer = self.synchronizer.clone();
292            spawn_monitored_task!(async move {
293                if let Err(err) = synchronizer
294                    .fetch_blocks(missing_excluded_ancestors, peer)
295                    .await
296                {
297                    debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
298                }
299            });
300        }
301
302        Ok(())
303    }
304
305    async fn handle_subscribe_blocks(
306        &self,
307        peer: AuthorityIndex,
308        last_received: Round,
309    ) -> ConsensusResult<BlockStream> {
310        fail_point_async!("consensus-rpc-response");
311
312        let dag_state = self.dag_state.read();
313        // Find recent own blocks that have not been received by the peer.
314        // If last_received is a valid and more blocks have been proposed since then, this call is
315        // guaranteed to return at least some recent blocks, which will help with liveness.
316        let missed_blocks = stream::iter(
317            dag_state
318                .get_cached_blocks(self.context.own_index, last_received + 1)
319                .into_iter()
320                .map(|block| ExtendedSerializedBlock {
321                    block: block.serialized().clone(),
322                    excluded_ancestors: vec![],
323                }),
324        );
325
326        let broadcasted_blocks = BroadcastedBlockStream::new(
327            peer,
328            self.rx_block_broadcast.resubscribe(),
329            self.subscription_counter.clone(),
330        );
331
332        // Return a stream of blocks that first yields missed blocks as requested, then new blocks.
333        Ok(Box::pin(missed_blocks.chain(
334            broadcasted_blocks.map(ExtendedSerializedBlock::from),
335        )))
336    }
337
338    // Handles two types of requests:
339    // 1. Missing block for block sync:
340    //    - uses highest_accepted_rounds.
341    //    - max_blocks_per_sync blocks should be returned.
342    // 2. Committed block for commit sync:
343    //    - does not use highest_accepted_rounds.
344    //    - max_blocks_per_fetch blocks should be returned.
345    async fn handle_fetch_blocks(
346        &self,
347        _peer: AuthorityIndex,
348        mut block_refs: Vec<BlockRef>,
349        highest_accepted_rounds: Vec<Round>,
350        breadth_first: bool,
351    ) -> ConsensusResult<Vec<Bytes>> {
352        fail_point_async!("consensus-rpc-response");
353
354        if !highest_accepted_rounds.is_empty()
355            && highest_accepted_rounds.len() != self.context.committee.size()
356        {
357            return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
358                highest_accepted_rounds.len(),
359                self.context.committee.size(),
360            ));
361        }
362
363        // Some quick validation of the requested block refs
364        let max_response_num_blocks = if !highest_accepted_rounds.is_empty() {
365            self.context.parameters.max_blocks_per_sync
366        } else {
367            self.context.parameters.max_blocks_per_fetch
368        };
369        if block_refs.len() > max_response_num_blocks {
370            block_refs.truncate(max_response_num_blocks);
371        }
372
373        // Validate the requested block refs.
374        for block in &block_refs {
375            if !self.context.committee.is_valid_index(block.author) {
376                return Err(ConsensusError::InvalidAuthorityIndex {
377                    index: block.author,
378                    max: self.context.committee.size(),
379                });
380            }
381            if block.round == GENESIS_ROUND {
382                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
383            }
384        }
385
386        // Get requested blocks from store.
387        let blocks = if !highest_accepted_rounds.is_empty() {
388            block_refs.sort();
389            block_refs.dedup();
390            let mut blocks = self
391                .dag_state
392                .read()
393                .get_blocks(&block_refs)
394                .into_iter()
395                .flatten()
396                .collect::<Vec<_>>();
397
398            if breadth_first {
399                // Get unique missing ancestor blocks of the requested blocks.
400                let mut missing_ancestors = blocks
401                    .iter()
402                    .flat_map(|block| block.ancestors().to_vec())
403                    .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
404                    .collect::<BTreeSet<_>>()
405                    .into_iter()
406                    .collect::<Vec<_>>();
407
408                // If there are too many missing ancestors, randomly select a subset to avoid
409                // fetching duplicated blocks across peers.
410                let selected_num_blocks = max_response_num_blocks.saturating_sub(blocks.len());
411                if selected_num_blocks < missing_ancestors.len() {
412                    missing_ancestors = missing_ancestors
413                        .choose_multiple(&mut rand::thread_rng(), selected_num_blocks)
414                        .copied()
415                        .collect::<Vec<_>>();
416                }
417                let ancestor_blocks = self.dag_state.read().get_blocks(&missing_ancestors);
418                blocks.extend(ancestor_blocks.into_iter().flatten());
419            } else {
420                // Get additional blocks from authorities with missing block, if they are available in cache.
421                // Compute the lowest missing round per requested authority.
422                let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
423                for block_ref in blocks.iter().map(|b| b.reference()) {
424                    let entry = lowest_missing_rounds
425                        .entry(block_ref.author)
426                        .or_insert(block_ref.round);
427                    *entry = (*entry).min(block_ref.round);
428                }
429
430                // Retrieve additional blocks per authority, from peer's highest accepted round + 1 to
431                // lowest missing round (exclusive) per requested authority.
432                // No block from other authorities are retrieved. It is possible that the requestor is not
433                // seeing missing block from another authority, and serving a block would just lead to unnecessary
434                // data transfer. Or missing blocks from other authorities are requested from other peers.
435                let dag_state = self.dag_state.read();
436                for (authority, lowest_missing_round) in lowest_missing_rounds {
437                    let highest_accepted_round = highest_accepted_rounds[authority];
438                    if highest_accepted_round >= lowest_missing_round {
439                        continue;
440                    }
441                    let missing_blocks = dag_state.get_cached_blocks_in_range(
442                        authority,
443                        highest_accepted_round + 1,
444                        lowest_missing_round,
445                        self.context
446                            .parameters
447                            .max_blocks_per_sync
448                            .saturating_sub(blocks.len()),
449                    );
450                    blocks.extend(missing_blocks);
451                    if blocks.len() >= self.context.parameters.max_blocks_per_sync {
452                        blocks.truncate(self.context.parameters.max_blocks_per_sync);
453                        break;
454                    }
455                }
456            }
457
458            blocks
459        } else {
460            self.dag_state
461                .read()
462                .get_blocks(&block_refs)
463                .into_iter()
464                .flatten()
465                .collect()
466        };
467
468        // Return the serialized blocks
469        let bytes = blocks
470            .into_iter()
471            .map(|block| block.serialized().clone())
472            .collect::<Vec<_>>();
473        Ok(bytes)
474    }
475
476    async fn handle_fetch_commits(
477        &self,
478        _peer: AuthorityIndex,
479        commit_range: CommitRange,
480    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
481        fail_point_async!("consensus-rpc-response");
482
483        // Compute an inclusive end index and bound the maximum number of commits scanned.
484        let inclusive_end = commit_range.end().min(
485            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
486                - 1,
487        );
488        let mut commits = self
489            .store
490            .scan_commits((commit_range.start()..=inclusive_end).into())?;
491        let mut certifier_block_refs = vec![];
492        'commit: while let Some(c) = commits.last() {
493            let index = c.index();
494            let votes = self.store.read_commit_votes(index)?;
495            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
496            for v in &votes {
497                stake_aggregator.add(v.author, &self.context.committee);
498            }
499            if stake_aggregator.reached_threshold(&self.context.committee) {
500                certifier_block_refs = votes;
501                break 'commit;
502            } else {
503                debug!(
504                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
505                    index,
506                    stake_aggregator.stake(),
507                    stake_aggregator.threshold(&self.context.committee)
508                );
509                self.context
510                    .metrics
511                    .node_metrics
512                    .commit_sync_fetch_commits_handler_uncertified_skipped
513                    .inc();
514                commits.pop();
515            }
516        }
517        let certifier_blocks = self
518            .store
519            .read_blocks(&certifier_block_refs)?
520            .into_iter()
521            .flatten()
522            .collect();
523        Ok((commits, certifier_blocks))
524    }
525
526    async fn handle_fetch_latest_blocks(
527        &self,
528        peer: AuthorityIndex,
529        authorities: Vec<AuthorityIndex>,
530    ) -> ConsensusResult<Vec<Bytes>> {
531        fail_point_async!("consensus-rpc-response");
532
533        if authorities.len() > self.context.committee.size() {
534            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
535        }
536
537        // Ensure that those are valid authorities
538        for authority in &authorities {
539            if !self.context.committee.is_valid_index(*authority) {
540                return Err(ConsensusError::InvalidAuthorityIndex {
541                    index: *authority,
542                    max: self.context.committee.size(),
543                });
544            }
545        }
546
547        // Read from the dag state to find the latest blocks.
548        // TODO: at the moment we don't look into the block manager for suspended blocks. Ideally we
549        // want in the future if we think we would like to tackle the majority of cases.
550        let mut blocks = vec![];
551        let dag_state = self.dag_state.read();
552        for authority in authorities {
553            let block = dag_state.get_last_block_for_authority(authority);
554
555            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
556
557            // no reason to serve back the genesis block - it's equal as if it has not received any block
558            if block.round() != GENESIS_ROUND {
559                blocks.push(block);
560            }
561        }
562
563        // Return the serialised blocks
564        let result = blocks
565            .into_iter()
566            .map(|block| block.serialized().clone())
567            .collect::<Vec<_>>();
568
569        Ok(result)
570    }
571
572    async fn handle_get_latest_rounds(
573        &self,
574        _peer: AuthorityIndex,
575    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
576        fail_point_async!("consensus-rpc-response");
577
578        let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
579
580        let blocks = self
581            .dag_state
582            .read()
583            .get_last_cached_block_per_authority(Round::MAX);
584        let highest_accepted_rounds = blocks
585            .into_iter()
586            .map(|(block, _)| block.round())
587            .collect::<Vec<_>>();
588
589        // Own blocks do not go through the core dispatcher, so they need to be set separately.
590        highest_received_rounds[self.context.own_index] =
591            highest_accepted_rounds[self.context.own_index];
592
593        Ok((highest_received_rounds, highest_accepted_rounds))
594    }
595}
596
597struct Counter {
598    count: usize,
599    subscriptions_by_authority: Vec<usize>,
600}
601
602/// Atomically counts the number of active subscriptions to the block broadcast stream,
603/// and dispatch commands to core based on the changes.
604struct SubscriptionCounter {
605    context: Arc<Context>,
606    counter: parking_lot::Mutex<Counter>,
607    dispatcher: Arc<dyn CoreThreadDispatcher>,
608}
609
610impl SubscriptionCounter {
611    fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
612        // Set the subscribed peers by default to 0
613        for (_, authority) in context.committee.authorities() {
614            context
615                .metrics
616                .node_metrics
617                .subscribed_by
618                .with_label_values(&[authority.hostname.as_str()])
619                .set(0);
620        }
621
622        Self {
623            counter: parking_lot::Mutex::new(Counter {
624                count: 0,
625                subscriptions_by_authority: vec![0; context.committee.size()],
626            }),
627            dispatcher,
628            context,
629        }
630    }
631
632    fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
633        let mut counter = self.counter.lock();
634        counter.count += 1;
635        counter.subscriptions_by_authority[peer] += 1;
636
637        let peer_hostname = &self.context.committee.authority(peer).hostname;
638        self.context
639            .metrics
640            .node_metrics
641            .subscribed_by
642            .with_label_values(&[peer_hostname])
643            .set(1);
644
645        if counter.count == 1 {
646            self.dispatcher
647                .set_subscriber_exists(true)
648                .map_err(|_| ConsensusError::Shutdown)?;
649        }
650        Ok(())
651    }
652
653    fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
654        let mut counter = self.counter.lock();
655        counter.count -= 1;
656        counter.subscriptions_by_authority[peer] -= 1;
657
658        if counter.subscriptions_by_authority[peer] == 0 {
659            let peer_hostname = &self.context.committee.authority(peer).hostname;
660            self.context
661                .metrics
662                .node_metrics
663                .subscribed_by
664                .with_label_values(&[peer_hostname])
665                .set(0);
666        }
667
668        if counter.count == 0 {
669            self.dispatcher
670                .set_subscriber_exists(false)
671                .map_err(|_| ConsensusError::Shutdown)?;
672        }
673        Ok(())
674    }
675}
676
677/// Each broadcasted block stream wraps a broadcast receiver for blocks.
678/// It yields blocks that are broadcasted after the stream is created.
679type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
680
681/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
682/// this tolerates lags with only logging, without yielding errors.
683struct BroadcastStream<T> {
684    peer: AuthorityIndex,
685    // Stores the receiver across poll_next() calls.
686    inner: ReusableBoxFuture<
687        'static,
688        (
689            Result<T, broadcast::error::RecvError>,
690            broadcast::Receiver<T>,
691        ),
692    >,
693    // Counts total subscriptions / active BroadcastStreams.
694    subscription_counter: Arc<SubscriptionCounter>,
695}
696
697impl<T: 'static + Clone + Send> BroadcastStream<T> {
698    pub fn new(
699        peer: AuthorityIndex,
700        rx: broadcast::Receiver<T>,
701        subscription_counter: Arc<SubscriptionCounter>,
702    ) -> Self {
703        if let Err(err) = subscription_counter.increment(peer) {
704            match err {
705                ConsensusError::Shutdown => {}
706                _ => panic!("Unexpected error: {err}"),
707            }
708        }
709        Self {
710            peer,
711            inner: ReusableBoxFuture::new(make_recv_future(rx)),
712            subscription_counter,
713        }
714    }
715}
716
717impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
718    type Item = T;
719
720    fn poll_next(
721        mut self: Pin<&mut Self>,
722        cx: &mut task::Context<'_>,
723    ) -> task::Poll<Option<Self::Item>> {
724        let peer = self.peer;
725        let maybe_item = loop {
726            let (result, rx) = ready!(self.inner.poll(cx));
727            self.inner.set(make_recv_future(rx));
728
729            match result {
730                Ok(item) => break Some(item),
731                Err(broadcast::error::RecvError::Closed) => {
732                    info!("Block BroadcastedBlockStream {} closed", peer);
733                    break None;
734                }
735                Err(broadcast::error::RecvError::Lagged(n)) => {
736                    warn!(
737                        "Block BroadcastedBlockStream {} lagged by {} messages",
738                        peer, n
739                    );
740                    continue;
741                }
742            }
743        };
744        task::Poll::Ready(maybe_item)
745    }
746}
747
748impl<T> Drop for BroadcastStream<T> {
749    fn drop(&mut self) {
750        if let Err(err) = self.subscription_counter.decrement(self.peer) {
751            match err {
752                ConsensusError::Shutdown => {}
753                _ => panic!("Unexpected error: {err}"),
754            }
755        }
756    }
757}
758
759async fn make_recv_future<T: Clone>(
760    mut rx: broadcast::Receiver<T>,
761) -> (
762    Result<T, broadcast::error::RecvError>,
763    broadcast::Receiver<T>,
764) {
765    let result = rx.recv().await;
766    (result, rx)
767}
768
769// TODO: add a unit test for BroadcastStream.
770
771#[cfg(test)]
772mod tests {
773    use std::{
774        collections::{BTreeMap, BTreeSet},
775        sync::Arc,
776        time::Duration,
777    };
778
779    use async_trait::async_trait;
780    use bytes::Bytes;
781    use consensus_config::AuthorityIndex;
782    use consensus_types::block::{BlockRef, Round};
783    use mysten_metrics::monitored_mpsc;
784    use parking_lot::{Mutex, RwLock};
785    use tokio::{sync::broadcast, time::sleep};
786
787    use crate::{
788        authority_service::AuthorityService,
789        block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
790        commit::{CertifiedCommits, CommitRange},
791        commit_vote_monitor::CommitVoteMonitor,
792        context::Context,
793        core_thread::{CoreError, CoreThreadDispatcher},
794        dag_state::DagState,
795        error::ConsensusResult,
796        network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
797        round_tracker::PeerRoundTracker,
798        storage::mem_store::MemStore,
799        synchronizer::Synchronizer,
800        test_dag_builder::DagBuilder,
801        transaction_certifier::TransactionCertifier,
802    };
803    struct FakeCoreThreadDispatcher {
804        blocks: Mutex<Vec<VerifiedBlock>>,
805    }
806
807    impl FakeCoreThreadDispatcher {
808        fn new() -> Self {
809            Self {
810                blocks: Mutex::new(vec![]),
811            }
812        }
813
814        fn get_blocks(&self) -> Vec<VerifiedBlock> {
815            self.blocks.lock().clone()
816        }
817    }
818
819    #[async_trait]
820    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
821        async fn add_blocks(
822            &self,
823            blocks: Vec<VerifiedBlock>,
824        ) -> Result<BTreeSet<BlockRef>, CoreError> {
825            let block_refs = blocks.iter().map(|b| b.reference()).collect();
826            self.blocks.lock().extend(blocks);
827            Ok(block_refs)
828        }
829
830        async fn check_block_refs(
831            &self,
832            _block_refs: Vec<BlockRef>,
833        ) -> Result<BTreeSet<BlockRef>, CoreError> {
834            Ok(BTreeSet::new())
835        }
836
837        async fn add_certified_commits(
838            &self,
839            _commits: CertifiedCommits,
840        ) -> Result<BTreeSet<BlockRef>, CoreError> {
841            todo!()
842        }
843
844        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
845            Ok(())
846        }
847
848        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
849            Ok(Default::default())
850        }
851
852        fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
853            todo!()
854        }
855
856        fn set_subscriber_exists(&self, _exists: bool) -> Result<(), CoreError> {
857            todo!()
858        }
859
860        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
861            todo!()
862        }
863
864        fn highest_received_rounds(&self) -> Vec<Round> {
865            todo!()
866        }
867    }
868
869    #[derive(Default)]
870    struct FakeNetworkClient {}
871
872    #[async_trait]
873    impl NetworkClient for FakeNetworkClient {
874        const SUPPORT_STREAMING: bool = false;
875
876        async fn send_block(
877            &self,
878            _peer: AuthorityIndex,
879            _block: &VerifiedBlock,
880            _timeout: Duration,
881        ) -> ConsensusResult<()> {
882            unimplemented!("Unimplemented")
883        }
884
885        async fn subscribe_blocks(
886            &self,
887            _peer: AuthorityIndex,
888            _last_received: Round,
889            _timeout: Duration,
890        ) -> ConsensusResult<BlockStream> {
891            unimplemented!("Unimplemented")
892        }
893
894        async fn fetch_blocks(
895            &self,
896            _peer: AuthorityIndex,
897            _block_refs: Vec<BlockRef>,
898            _highest_accepted_rounds: Vec<Round>,
899            _breadth_first: bool,
900            _timeout: Duration,
901        ) -> ConsensusResult<Vec<Bytes>> {
902            unimplemented!("Unimplemented")
903        }
904
905        async fn fetch_commits(
906            &self,
907            _peer: AuthorityIndex,
908            _commit_range: CommitRange,
909            _timeout: Duration,
910        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
911            unimplemented!("Unimplemented")
912        }
913
914        async fn fetch_latest_blocks(
915            &self,
916            _peer: AuthorityIndex,
917            _authorities: Vec<AuthorityIndex>,
918            _timeout: Duration,
919        ) -> ConsensusResult<Vec<Bytes>> {
920            unimplemented!("Unimplemented")
921        }
922
923        async fn get_latest_rounds(
924            &self,
925            _peer: AuthorityIndex,
926            _timeout: Duration,
927        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
928            unimplemented!("Unimplemented")
929        }
930    }
931
932    #[tokio::test(flavor = "current_thread", start_paused = true)]
933    async fn test_handle_send_block() {
934        let (context, _keys) = Context::new_for_test(4);
935        let context = Arc::new(context);
936        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
937        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
938        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
939        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
940        let network_client = Arc::new(FakeNetworkClient::default());
941        let (blocks_sender, _blocks_receiver) =
942            monitored_mpsc::unbounded_channel("consensus_block_output");
943        let store = Arc::new(MemStore::new());
944        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
945        let transaction_certifier =
946            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
947        let synchronizer = Synchronizer::start(
948            network_client,
949            context.clone(),
950            core_dispatcher.clone(),
951            commit_vote_monitor.clone(),
952            block_verifier.clone(),
953            transaction_certifier.clone(),
954            dag_state.clone(),
955            false,
956        );
957        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
958        let authority_service = Arc::new(AuthorityService::new(
959            context.clone(),
960            block_verifier,
961            commit_vote_monitor,
962            round_tracker,
963            synchronizer,
964            core_dispatcher.clone(),
965            rx_block_broadcast,
966            transaction_certifier,
967            dag_state,
968            store,
969        ));
970
971        // Test delaying blocks with time drift.
972        let now = context.clock.timestamp_utc_ms();
973        let max_drift = context.parameters.max_forward_time_drift;
974        let input_block = VerifiedBlock::new_for_test(
975            TestBlock::new(9, 0)
976                .set_timestamp_ms(now + max_drift.as_millis() as u64)
977                .build(),
978        );
979
980        let service = authority_service.clone();
981        let serialized = ExtendedSerializedBlock {
982            block: input_block.serialized().clone(),
983            excluded_ancestors: vec![],
984        };
985
986        tokio::spawn(async move {
987            service
988                .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
989                .await
990                .unwrap();
991        });
992
993        sleep(max_drift / 2).await;
994
995        let blocks = core_dispatcher.get_blocks();
996        assert_eq!(blocks.len(), 1);
997        assert_eq!(blocks[0], input_block);
998    }
999
1000    #[tokio::test(flavor = "current_thread", start_paused = true)]
1001    async fn test_handle_fetch_blocks() {
1002        // GIVEN
1003        // Use NUM_AUTHORITIES and NUM_ROUNDS higher than max_blocks_per_sync to test limits.
1004        const NUM_AUTHORITIES: usize = 40;
1005        const NUM_ROUNDS: usize = 40;
1006        let (context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1007        let context = Arc::new(context);
1008        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1009        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1010        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1011        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1012        let network_client = Arc::new(FakeNetworkClient::default());
1013        let (blocks_sender, _blocks_receiver) =
1014            monitored_mpsc::unbounded_channel("consensus_block_output");
1015        let store = Arc::new(MemStore::new());
1016        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1017        let transaction_certifier =
1018            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1019        let synchronizer = Synchronizer::start(
1020            network_client,
1021            context.clone(),
1022            core_dispatcher.clone(),
1023            commit_vote_monitor.clone(),
1024            block_verifier.clone(),
1025            transaction_certifier.clone(),
1026            dag_state.clone(),
1027            false,
1028        );
1029        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1030        let authority_service = Arc::new(AuthorityService::new(
1031            context.clone(),
1032            block_verifier,
1033            commit_vote_monitor,
1034            round_tracker,
1035            synchronizer,
1036            core_dispatcher.clone(),
1037            rx_block_broadcast,
1038            transaction_certifier,
1039            dag_state.clone(),
1040            store,
1041        ));
1042
1043        // GIVEN: 40 rounds of blocks in the dag state.
1044        let mut dag_builder = DagBuilder::new(context.clone());
1045        dag_builder
1046            .layers(1..=(NUM_ROUNDS as u32))
1047            .build()
1048            .persist_layers(dag_state.clone());
1049        let all_blocks = dag_builder.all_blocks();
1050
1051        // WHEN: Request 2 blocks from round 40, get ancestors breadth first.
1052        let missing_block_refs: Vec<BlockRef> = all_blocks
1053            .iter()
1054            .rev()
1055            .take(2)
1056            .map(|b| b.reference())
1057            .collect();
1058        let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1059        let results = authority_service
1060            .handle_fetch_blocks(
1061                AuthorityIndex::new_for_test(0),
1062                missing_block_refs.clone(),
1063                highest_accepted_rounds,
1064                true,
1065            )
1066            .await
1067            .unwrap();
1068
1069        // THEN: the expected number of unique blocks are returned.
1070        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1071            .iter()
1072            .map(|b| {
1073                let signed = bcs::from_bytes(b).unwrap();
1074                let block = VerifiedBlock::new_verified(signed, b.clone());
1075                (block.reference(), block)
1076            })
1077            .collect();
1078        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1079        // All missing blocks are returned.
1080        for b in &missing_block_refs {
1081            assert!(blocks.contains_key(b));
1082        }
1083        let num_missing_ancestors = blocks
1084            .keys()
1085            .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1086            .count();
1087        assert_eq!(
1088            num_missing_ancestors,
1089            context.parameters.max_blocks_per_sync - missing_block_refs.len()
1090        );
1091
1092        // WHEN: Request 2 blocks from round 37, get ancestors depth first.
1093        let missing_round = NUM_ROUNDS as Round - 3;
1094        let missing_block_refs: Vec<BlockRef> = all_blocks
1095            .iter()
1096            .filter(|b| b.reference().round == missing_round)
1097            .map(|b| b.reference())
1098            .take(2)
1099            .collect();
1100        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1101        // Try to fill up the blocks from the 1st authority in missing_block_refs.
1102        highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1103        let results = authority_service
1104            .handle_fetch_blocks(
1105                AuthorityIndex::new_for_test(0),
1106                missing_block_refs.clone(),
1107                highest_accepted_rounds,
1108                false,
1109            )
1110            .await
1111            .unwrap();
1112
1113        // THEN: the expected number of unique blocks are returned.
1114        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1115            .iter()
1116            .map(|b| {
1117                let signed = bcs::from_bytes(b).unwrap();
1118                let block = VerifiedBlock::new_verified(signed, b.clone());
1119                (block.reference(), block)
1120            })
1121            .collect();
1122        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1123        // All missing blocks are returned.
1124        for b in &missing_block_refs {
1125            assert!(blocks.contains_key(b));
1126        }
1127        // Ancestor blocks are from the expected rounds and authorities.
1128        let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1129        for b in blocks.keys() {
1130            assert!(b.round <= missing_round);
1131            assert!(expected_authors.contains(&b.author));
1132        }
1133
1134        // WHEN: Request 5 block from round 40, not getting ancestors.
1135        let missing_block_refs: Vec<BlockRef> = all_blocks
1136            .iter()
1137            .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1138            .map(|b| b.reference())
1139            .take(5)
1140            .collect();
1141        let results = authority_service
1142            .handle_fetch_blocks(
1143                AuthorityIndex::new_for_test(0),
1144                missing_block_refs.clone(),
1145                vec![],
1146                false,
1147            )
1148            .await
1149            .unwrap();
1150
1151        // THEN: the expected number of unique blocks are returned.
1152        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1153            .iter()
1154            .map(|b| {
1155                let signed = bcs::from_bytes(b).unwrap();
1156                let block = VerifiedBlock::new_verified(signed, b.clone());
1157                (block.reference(), block)
1158            })
1159            .collect();
1160        assert_eq!(blocks.len(), 5);
1161        for b in &missing_block_refs {
1162            assert!(blocks.contains_key(b));
1163        }
1164    }
1165
1166    #[tokio::test(flavor = "current_thread", start_paused = true)]
1167    async fn test_handle_fetch_latest_blocks() {
1168        // GIVEN
1169        let (context, _keys) = Context::new_for_test(4);
1170        let context = Arc::new(context);
1171        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1172        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1173        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1174        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1175        let network_client = Arc::new(FakeNetworkClient::default());
1176        let (blocks_sender, _blocks_receiver) =
1177            monitored_mpsc::unbounded_channel("consensus_block_output");
1178        let store = Arc::new(MemStore::new());
1179        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1180        let transaction_certifier =
1181            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1182        let synchronizer = Synchronizer::start(
1183            network_client,
1184            context.clone(),
1185            core_dispatcher.clone(),
1186            commit_vote_monitor.clone(),
1187            block_verifier.clone(),
1188            transaction_certifier.clone(),
1189            dag_state.clone(),
1190            true,
1191        );
1192        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1193        let authority_service = Arc::new(AuthorityService::new(
1194            context.clone(),
1195            block_verifier,
1196            commit_vote_monitor,
1197            round_tracker,
1198            synchronizer,
1199            core_dispatcher.clone(),
1200            rx_block_broadcast,
1201            transaction_certifier,
1202            dag_state.clone(),
1203            store,
1204        ));
1205
1206        // Create some blocks for a few authorities. Create some equivocations as well and store in dag state.
1207        let mut dag_builder = DagBuilder::new(context.clone());
1208        dag_builder
1209            .layers(1..=10)
1210            .authorities(vec![AuthorityIndex::new_for_test(2)])
1211            .equivocate(1)
1212            .build()
1213            .persist_layers(dag_state);
1214
1215        // WHEN
1216        let authorities_to_request = vec![
1217            AuthorityIndex::new_for_test(1),
1218            AuthorityIndex::new_for_test(2),
1219        ];
1220        let results = authority_service
1221            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1222            .await;
1223
1224        // THEN
1225        let serialised_blocks = results.unwrap();
1226        for serialised_block in serialised_blocks {
1227            let signed_block: SignedBlock =
1228                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1229            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1230
1231            assert_eq!(verified_block.round(), 10);
1232        }
1233    }
1234}